#!/bin/bash set -x # Help function show_help() { echo "Usage: $0 [OPTIONS]" echo "" echo "Options:" echo " -q, --queues QUEUES Comma-separated list of queues to process" echo " -c, --concurrency NUM Number of worker processes (default: 1)" echo " -P, --pool POOL Pool implementation (default: gevent)" echo " --loglevel LEVEL Log level (default: INFO)" echo " -e, --env-file FILE Path to an env file to source before starting" echo " -h, --help Show this help message" echo "" echo "Examples:" echo " $0 --queues dataset,workflow" echo " $0 --queues workflow_professional,workflow_team --concurrency 4" echo " $0 --queues dataset --concurrency 2 --pool prefork" echo "" echo "Available queues:" echo " dataset - RAG indexing and document processing" echo " workflow - Workflow triggers (community edition)" echo " workflow_professional - Professional tier workflows (cloud edition)" echo " workflow_team - Team tier workflows (cloud edition)" echo " workflow_sandbox - Sandbox tier workflows (cloud edition)" echo " schedule_poller - Schedule polling tasks" echo " schedule_executor - Schedule execution tasks" echo " mail - Email notifications" echo " ops_trace - Operations tracing" echo " app_deletion - Application cleanup" echo " plugin - Plugin operations" echo " workflow_storage - Workflow storage tasks" echo " conversation - Conversation tasks" echo " priority_pipeline - High priority pipeline tasks" echo " pipeline - Standard pipeline tasks" echo " triggered_workflow_dispatcher - Trigger dispatcher tasks" echo " trigger_refresh_executor - Trigger refresh tasks" } # Parse command line arguments QUEUES="" CONCURRENCY=1 POOL="gevent" LOGLEVEL="INFO" ENV_FILE="" while [[ $# -gt 0 ]]; do case $1 in -q|--queues) QUEUES="$2" shift 2 ;; -c|--concurrency) CONCURRENCY="$2" shift 2 ;; -P|--pool) POOL="$2" shift 2 ;; --loglevel) LOGLEVEL="$2" shift 2 ;; -e|--env-file) ENV_FILE="$2" shift 2 ;; -h|--help) show_help exit 0 ;; *) echo "Unknown option: $1" show_help exit 1 ;; esac done SCRIPT_DIR="$(dirname "$(realpath "$0")")" cd "$SCRIPT_DIR/.." if [[ -n "${ENV_FILE}" ]]; then if [[ ! -f "${ENV_FILE}" ]]; then echo "Env file ${ENV_FILE} not found" exit 1 fi echo "Loading environment variables from ${ENV_FILE}" # Export everything sourced from the env file set -a source "${ENV_FILE}" set +a fi # If no queues specified, use edition-based defaults if [[ -z "${QUEUES}" ]]; then # Get EDITION from environment, default to SELF_HOSTED (community edition) EDITION=${EDITION:-"SELF_HOSTED"} # Configure queues based on edition if [[ "${EDITION}" == "CLOUD" ]]; then # Cloud edition: separate queues for dataset and trigger tasks QUEUES="dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor" else # Community edition (SELF_HOSTED): dataset and workflow have separate queues QUEUES="dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor" fi echo "No queues specified, using edition-based defaults: ${QUEUES}" else echo "Using specified queues: ${QUEUES}" fi echo "Starting Celery worker with:" echo " Queues: ${QUEUES}" echo " Concurrency: ${CONCURRENCY}" echo " Pool: ${POOL}" echo " Log Level: ${LOGLEVEL}" uv --directory api run \ celery -A app.celery worker \ -P ${POOL} -c ${CONCURRENCY} --loglevel ${LOGLEVEL} -Q ${QUEUES}