Analyze Celery task configurations for reliability, performance, and operational excellence. Reviews task definitions, retry policies, routing rules, serialization settings, result backend configuration, worker tuning, and beat schedules. Acts as a senior distributed systems engineer auditing your Celery deployment.
Invoke this skill when you need to review Celery task configurations, optimize worker performance, or ensure reliability best practices.
Basic invocation:
> Analyze the Celery tasks in /path/to/app/tasks/
> Review this Celery configuration for production readiness
> Check task reliability across the project
Focused analysis:
> Audit retry policies for all Celery tasks
> Review task routing configuration
> Check beat schedule for overlapping tasks
> Analyze worker configuration for memory leaks
The agent reads Celery task files, configuration modules, and beat schedules, then produces a comprehensive quality report.
The agent locates all Celery-related code:
# Find Celery app configuration
grep -rl "Celery(\|celery_app\|make_celery" /path/to/app/ --include="*.py"
# Find task definitions
grep -rl "@app.task\|@shared_task\|@celery.task" /path/to/app/ --include="*.py"
# Find beat schedule definitions
grep -rl "beat_schedule\|CELERYBEAT_SCHEDULE\|crontab\|solar" /path/to/app/ --include="*.py"
# Find configuration files
grep -rl "broker_url\|result_backend\|CELERY_" /path/to/app/ --include="*.py"
The agent parses each component to extract:
The agent checks each task's configuration:
Task decorator analysis:
# GOOD: Well-configured task
@shared_task(
bind=True,
name="orders.process_payment",
max_retries=3,
default_retry_delay=60,
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
acks_late=True,
reject_on_worker_lost=True,
time_limit=300,
soft_time_limit=270,
rate_limit="100/m",
track_started=True,
ignore_result=False,
)
def process_payment(self, order_id: int) -> dict:
...
# PROBLEMS the agent detects:
FAIL: Task "send_email" — no retry configuration
@shared_task
def send_email(to, subject, body):
RISK: Transient SMTP failures cause permanent message loss
FIX: Add retry policy:
@shared_task(bind=True, max_retries=3, retry_backoff=True)
def send_email(self, to, subject, body):
try: ...
except SMTPException as exc:
raise self.retry(exc=exc)
FAIL: Task "process_payment" — no time_limit
RISK: Stuck payment processing blocks worker indefinitely
FIX: Add time_limit=300, soft_time_limit=270
The soft limit raises SoftTimeLimitExceeded so you can clean up
FAIL: Task "generate_report" — acks_late=False (default)
RISK: If worker crashes mid-task, message is already acknowledged
The report generation is lost with no retry
FIX: acks_late=True, reject_on_worker_lost=True
WARN: Task "sync_inventory" — no rate_limit
This task calls external API with rate limits
RISK: Worker bursts can trigger API throttling for all consumers
FIX: rate_limit="30/m" (match API rate limit)
WARN: Task "cleanup_temp_files" — ignore_result not set
Results are stored but never read for this fire-and-forget task
FIX: ignore_result=True — saves result backend storage and bandwidth
WARN: Task "update_search_index" — no bind=True
Cannot access self.retry() or self.request without bind=True
FIX: @shared_task(bind=True) and add self as first parameter
Task naming audit:
Task Naming Analysis:
FAIL: Task uses auto-generated name
"app.tasks.send_email" — derived from module path
RISK: Renaming the module or moving the file breaks all pending tasks
FIX: Set explicit name: @shared_task(name="notifications.send_email")
FAIL: Inconsistent naming convention
"process_order" (no namespace)
"payments.charge_card" (dotted namespace)
"UserSync" (PascalCase)
FIX: Use consistent dotted namespace: "domain.action_verb"
"orders.process_order", "payments.charge_card", "users.sync_user"
WARN: Task name collision risk
"process" defined in both orders/tasks.py and payments/tasks.py
Auto-generated names differ but explicit names might collide
FIX: Always use namespaced explicit names
The agent evaluates retry configurations for reliability:
Retry Policy Analysis:
Task: "orders.process_payment"
max_retries: 3
retry_backoff: True (exponential: 1s, 2s, 4s)
retry_backoff_max: 600s
retry_jitter: True
PASS: Well-configured — exponential backoff with jitter prevents thundering herd
Task: "integrations.sync_crm"
max_retries: 10
default_retry_delay: 5
retry_backoff: False
FAIL: Fixed 5-second delay with 10 retries = only 50 seconds of retry window
For CRM API issues that may last minutes, this is insufficient
FIX: retry_backoff=True, retry_backoff_max=1800
This gives: 5s, 10s, 20s, 40s, ... up to 30 min — covers longer outages
Task: "reports.generate_pdf"
max_retries: None (infinite retries!)
FAIL: Infinite retries — a poison message retries forever
RISK: Consumes worker slots indefinitely, fills dead letter queue
FIX: Set finite max_retries (3-5 for most tasks)
Task: "notifications.send_push"
Retries on all exceptions (bare except)
FAIL: Retries on ValueError, TypeError — bugs never surface
FIX: Only retry on transient errors:
except (ConnectionError, TimeoutError) as exc:
raise self.retry(exc=exc)
# Let ValueError, TypeError propagate as failures
Task: "etl.import_csv"
autoretry_for=(Exception,)
FAIL: autoretry_for too broad — retries on programming errors
FIX: autoretry_for=(IOError, ConnectionError, OperationalError)
Retry Summary:
Tasks with retry: 8/15
Tasks without retry: 5/15 (REVIEW NEEDED)
Tasks with bad retry: 2/15 (NEEDS FIX)
Infinite retry tasks: 1/15 (CRITICAL)
The agent analyzes routing configuration:
Routing Analysis:
Queue Configuration:
Queues defined: ["default", "high_priority", "bulk", "notifications"]
FAIL: No task_routes configured
All 15 tasks go to "default" queue
RISK: Bulk import tasks starve payment processing tasks
FIX: Configure routing:
task_routes = {
"orders.process_payment": {"queue": "high_priority"},
"etl.import_*": {"queue": "bulk"},
"notifications.*": {"queue": "notifications"},
}
FAIL: No dedicated workers per queue
Single worker process consumes all queues
RISK: Cannot scale queues independently, cannot set per-queue concurrency
FIX: Run separate workers:
celery -A app worker -Q high_priority -c 4
celery -A app worker -Q bulk -c 2
celery -A app worker -Q notifications -c 8
WARN: No priority_steps configured
Default priority is FIFO only — cannot prioritize within a queue
FIX: For RabbitMQ: task_queue_max_priority=10
Then: process_payment.apply_async(priority=9)
WARN: No dead letter exchange configured
Failed messages after max_retries are silently discarded
FIX: Configure DLX for post-mortem analysis:
task_reject_on_worker_lost = True
task_acks_on_failure_or_timeout = False
The agent checks serialization settings:
Serialization Analysis:
FAIL: task_serializer = "pickle"
RISK: Pickle deserialization allows arbitrary code execution
Any message injected into the broker can execute code on workers
FIX: task_serializer = "json"
AND: accept_content = ["json"] (reject pickle messages)
WARN: result_serializer not explicitly set
Defaults to task_serializer — if pickle, results are also pickled
FIX: result_serializer = "json"
FAIL: accept_content = ["json", "pickle"]
Still accepts pickle — attacker can send pickle-serialized message
FIX: accept_content = ["json"] only
WARN: Task "ml.train_model" passes non-JSON-serializable argument
Argument: numpy array, pandas DataFrame
RISK: Will fail with json serializer
FIX: Serialize data before sending:
- Pass file path or S3 key instead of raw data
- Use msgpack serializer for binary data
- Convert to list/dict before sending
PASS: No task passes ORM model instances as arguments
(ORM objects are not serializable and create stale data issues)
The agent evaluates result backend setup:
Result Backend Analysis:
Backend: Redis (redis://localhost:6379/1)
FAIL: result_expires not configured
Default: 24 hours — results accumulate in Redis
For 10,000 tasks/day = 10,000 keys consuming memory
FIX: result_expires = 3600 (1 hour, or match your read pattern)
FAIL: Same Redis instance for broker and results
redis://localhost:6379/0 (broker)
redis://localhost:6379/1 (results)
RISK: Result storage OOM can crash the broker
FIX: Use separate Redis instances for broker and results
WARN: result_extended = False (default)
Extended results include task args, worker hostname, timestamps
Valuable for debugging production issues
FIX: result_extended = True
WARN: 8 tasks have ignore_result=False but results are never read
These tasks store results that nobody fetches
FIX: Set ignore_result=True on fire-and-forget tasks:
send_email, update_cache, sync_analytics, log_event,
send_webhook, cleanup_files, update_counter, emit_metric
FAIL: No result_backend_transport_options
No connection pooling or timeout configuration
FIX: result_backend_transport_options = {
"retry_policy": {"max_retries": 3, "interval_start": 0.2},
"socket_timeout": 5,
}
The agent reviews periodic task configuration:
Beat Schedule Analysis:
Schedule: 12 periodic tasks configured
FAIL: Overlapping schedules detected
"sync_all_users" runs every 5 minutes (crontab(*/5))
"sync_premium_users" runs every 3 minutes (crontab(*/3))
Both query user table — concurrent runs cause lock contention
FIX: Stagger schedules or merge into single task with priority flag
FAIL: Task "generate_daily_report" — no expires setting
Schedule: crontab(hour=2, minute=0) — runs at 2 AM
If worker is down at 2 AM, task queues and runs when worker recovers
By then the report may be stale or a new execution is already scheduled
FIX: Add expires=3600 — discard if not executed within 1 hour
FAIL: No task locking — concurrent executions possible
"sync_inventory" runs every 10 minutes, average runtime 12 minutes
Overlap: new execution starts before previous completes
FIX: Use celery-once or implement distributed lock:
from celery_once import QueueOnce
@task(base=QueueOnce, once={"graceful": True})
WARN: Beat schedule uses intervals instead of crontab for daily tasks
"cleanup" — schedule=timedelta(hours=24)
RISK: Interval drifts over time (runs at 2:00, then 2:01, 2:03...)
FIX: Use crontab for fixed-time tasks:
schedule=crontab(hour=2, minute=0)
WARN: No jitter on high-frequency tasks
5 tasks run every minute — all hit broker simultaneously
FIX: Stagger start times:
crontab(minute="*/5") — :00, :05, :10...
crontab(minute="1-59/5") — :01, :06, :11... (offset by 1)
Beat Health:
Total periodic tasks: 12
Tasks with expires: 3/12 (NEEDS ATTENTION)
Tasks with lock protection: 1/12 (CRITICAL)
Overlapping schedules: 2 pairs detected
The agent audits worker settings:
Worker Configuration Analysis:
FAIL: worker_prefetch_multiplier = 4 (default)
With long-running tasks, worker grabs 4 tasks but can only process 1
Other tasks wait in prefetch buffer — poor load distribution
FIX: worker_prefetch_multiplier = 1 for long tasks
OR: worker_prefetch_multiplier = 0 (disable) for strict fair scheduling
FAIL: No worker_max_tasks_per_child configured
RISK: Memory leaks in tasks accumulate until worker OOM
FIX: worker_max_tasks_per_child = 1000
Worker restarts after 1000 tasks — recovers leaked memory
FAIL: No worker_max_memory_per_child configured
RISK: Single memory-intensive task can crash the worker
FIX: worker_max_memory_per_child = 400000 (400 MB in KB)
Worker restarts if memory exceeds limit
WARN: worker_pool = "prefork" with I/O-bound tasks
Most tasks are API calls and database queries (I/O bound)
Prefork creates heavy processes for each worker
CONSIDER: worker_pool = "gevent" or "eventlet" for I/O-bound workloads
CAUTION: Not safe if tasks use non-greenlet-safe libraries
WARN: worker_concurrency not explicitly set
Defaults to CPU count — may be too high for memory-heavy tasks
FIX: Set based on workload:
CPU-bound: worker_concurrency = CPU_COUNT
I/O-bound (prefork): worker_concurrency = CPU_COUNT * 2
I/O-bound (gevent): worker_concurrency = 100-500
FAIL: No task_soft_time_limit default
Individual tasks can run forever if no per-task limit set
FIX: Set global defaults:
task_soft_time_limit = 300 (5 minutes)
task_time_limit = 330 (hard kill 30s after soft limit)
The agent evaluates task design for correctness:
Task Design Analysis:
FAIL: Task "process_batch" accepts queryset as argument
def process_batch(self, queryset):
RISK: Django querysets are not serializable
RISK: Even if pickled, data is stale by the time task runs
FIX: Pass IDs, query in the task:
def process_batch(self, item_ids: list[int]):
items = Item.objects.filter(id__in=item_ids)
FAIL: Task "update_dashboard" has database transaction spanning task
with transaction.atomic():
... (entire task body)
RISK: Long-running transaction holds locks, blocks other queries
FIX: Use smaller transactions within the task, or use bulk operations
FAIL: Task "send_notifications" iterates and sends one by one
for user in users:
send_push(user.id) # Synchronous call inside task
RISK: If task fails at user #500 of 1000, all 500 are lost
FIX: Fan out to individual tasks:
group(send_push.s(user_id) for user_id in user_ids).apply_async()
WARN: Task "import_data" has no idempotency check
Re-running creates duplicate records
FIX: Use unique constraints and upsert logic, or check task ID:
if cache.get(f"task_done:{self.request.id}"):
return # Already processed
WARN: Task "process_order" calls another task synchronously
result = charge_card.delay(order_id).get(timeout=30)
FAIL: .get() blocks the worker — deadlock risk if workers exhausted
FIX: Use chain or callback:
chain(charge_card.s(order_id), fulfill_order.s()).apply_async()
PASS: Task arguments are JSON-serializable primitives
PASS: No global mutable state accessed between tasks
The agent generates a comprehensive report:
# Celery Task Analysis Report
# Project: /path/to/app/ | Date: April 30, 2026
## Overview
Tasks: 15
Periodic tasks: 12
Queues: 4 (default, high_priority, bulk, notifications)
Broker: RabbitMQ
Result backend: Redis
Worker pool: prefork
## Overall Health Score: 51/100
## Category Scores
Task Configuration: 5/10 (missing retries, no time limits)
Retry Policies: 4/10 (infinite retries, broad exception catch)
Routing & Queues: 4/10 (no routing, single worker)
Serialization: 3/10 (pickle in use — security risk)
Result Backend: 5/10 (no expiry, shared instance)
Beat Schedule: 5/10 (overlaps, no expires, no locks)
Worker Config: 4/10 (no memory limits, no prefetch tuning)
Task Design: 6/10 (queryset args, sync calls)
Monitoring: 5/10 (no Flower, no metrics)
## Critical Issues
1. Pickle serializer enabled — remote code execution risk
2. Infinite retry task — poison messages never cleared
3. No task time limits — stuck tasks block workers indefinitely
4. Synchronous .get() call inside task — deadlock risk
5. No worker memory limits — OOM crashes
## Recommendations Summary
Estimated effort: 3-5 days for critical + high priority fixes
Expected improvement: 51 -> 80 health score
Risk reduction: Eliminates security vulnerability and 3 crash scenarios
The agent produces:
| Scope | What It Covers |
|---|---|
| ------- | --------------- |
| Full (default) | All tasks, config, beat, workers |
| Single task | Deep analysis of one task definition |
| Config only | Broker, backend, serialization, worker settings |
| Beat only | Periodic task schedule analysis |
| Retry audit | Retry policies across all tasks |
| Security | Serialization, auth, and message signing |
The agent adapts recommendations based on the broker detected:
| Feature | RabbitMQ | Redis | SQS |
|---|---|---|---|
| --------- | ---------- | ------- | ----- |
| Priority queues | Native support | Limited (0-9) | Not supported |
| Dead letter exchange | Native DLX | Manual implementation | Native DLQ |
| Message persistence | Per-message | AOF/RDB | Always persistent |
| Task routing | Exchange/routing key | Queue name only | Queue URL |
| Visibility timeout | ack_late + prefetch | visibility_timeout | Default 30s |
| Max message size | 128 MB default | 512 MB | 256 KB (use S3) |
共 1 个版本