Celery: Distributed Task Queue for Python — Complete Guide
Celery is a distributed task queue for Python that executes asynchronous tasks outside the HTTP request-response cycle, enabling background processing, scheduled jobs, and real-time operations at scale.
What You’ll Learn
By the end of this tutorial, you’ll understand Celery’s architecture — brokers, workers, result backends — and build production-grade async task pipelines with scheduling, monitoring, and error handling.
Why Celery Matters
Web applications must respond quickly. A request that sends emails, processes images, or calls third-party APIs blocks the user for seconds. Celery moves these tasks to background workers. Doda Browser uses Celery to process analytics events, generate thumbnails, and send notification emails without degrading page load times.
Celery Architecture
flowchart TB
subgraph "Celery System"
A[Web App
Producer] --> B[Message Broker
RabbitMQ/Redis]
B --> C[Worker 1]
B --> D[Worker 2]
B --> E[Worker N]
C --> F[Result Backend
Redis/DB]
D --> F
E --> F
G[Celery Beat
Scheduler] --> B
H[Flower
Monitor] -.-> C
H -.-> D
H -.-> E
end
style B fill:#f90,color:#fff
Installing Celery
pip install celery[redis] flowerThis installs Celery with Redis support and the Flower monitoring tool.
Basic Celery Setup
# tasks.py
# Celery app configuration
from celery import Celery
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
include=['tasks'],
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)Defining Your First Task
# tasks.py (continued)
# A simple Celery task
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_email, username):
"""Send a welcome email to a new user."""
try:
# Simulate email sending
print(f"Sending welcome email to {user_email}...")
# import requests
# response = requests.post(
# "https://api.sendgrid.com/v3/mail/send",
# json={"to": user_email, "template": "welcome"},
# headers={"Authorization": "Bearer SG_API_KEY"}
# )
# response.raise_for_status()
print(f"Welcome email sent to {username} at {user_email}")
return {"status": "sent", "email": user_email}
except Exception as exc:
print(f"Failed to send email: {exc}")
raise self.retry(exc=exc)
@app.task
def process_image(image_path):
"""Generate thumbnails for an uploaded image."""
import time
time.sleep(2) # Simulate processing
print(f"Processed image: {image_path}")
return {"image": image_path, "thumbnail": image_path.replace('.jpg', '_thumb.jpg')}Running the Worker
celery -A tasks worker --loglevel=infoExpected output:
-------------- celery@hostname v5.4.0 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-6.8.0-x86_64-with-glibc2.35 2026-06-20 10:00:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f8a1c2b3d00
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.send_welcome_email
. tasks.process_image
[2026-06-20 10:00:01: INFO/MainProcess] Connected to redis://localhost:6379/0
[2026-06-20 10:00:01: INFO/MainProcess] mingle: searching for neighbors
[2026-06-20 10:00:02: INFO/MainProcess] mingle: all alone
[2026-06-20 10:00:02: INFO/MainProcess] ready.Calling Tasks
# call_tasks.py
# Demonstrates calling Celery tasks synchronously and asynchronously
from tasks import send_welcome_email, process_image
# Async call — returns immediately
result = send_welcome_email.delay('user@example.com', 'Alice')
print(f"Task ID: {result.id}")
print(f"Task status: {result.status}")
# Wait for result (blocking)
print(f"Result: {result.get(timeout=10)}")
# Async call with arguments
image_result = process_image.delay('/uploads/photo.jpg')
print(f"Image task ID: {image_result.id}")
# Check status without blocking
from celery.result import AsyncResult
status = AsyncResult(image_result.id, app=send_welcome_email.app)
print(f"Pending: {status.status}") # PENDING, STARTED, SUCCESS, FAILUREExpected output:
Task ID: 550e8400-e29b-41d4-a716-446655440000
Task status: PENDING
Sending welcome email to user@example.com...
Welcome email sent to Alice at user@example.com
Result: {'status': 'sent', 'email': 'user@example.com'}
Image task ID: 660e8400-e29b-41d4-a716-446655440001
Pending: PENDINGTask Chaining
# chaining.py
# Chain tasks so one runs after another with the previous result as input
from celery import chain, group, chord
from tasks import app
@app.task
def add(x, y):
result = x + y
print(f"{x} + {y} = {result}")
return result
@app.task
def multiply(x, y):
result = x * y
print(f"{x} * {y} = {result}")
return result
@app.task
def format_result(value):
formatted = f"Final result: {value}"
print(formatted)
return formatted
# Chain: add(2, 2) → multiply(4, 10) → format_result("Final result: 40")
task_chain = chain(
add.s(2, 2),
multiply.s(10),
format_result.s()
)
result = task_chain()
print(f"Chain result: {result.get()}")
# Group: run tasks in parallel
parallel = group(
add.s(1, 2),
add.s(3, 4),
add.s(5, 6),
)
group_result = parallel()
print(f"Group results: {group_result.get()}")Expected output:
2 + 2 = 4
4 * 10 = 40
Final result: 40
Chain result: Final result: 40
1 + 2 = 3
3 + 4 = 7
5 + 6 = 11
Group results: [3, 7, 11]Celery Beat — Scheduled Tasks
# celerybeat_schedule.py
# Add to your Celery app configuration for periodic tasks
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-expired-sessions': {
'task': 'tasks.cleanup_sessions',
'schedule': crontab(hour=3, minute=0), # Daily at 3 AM
},
'generate-daily-report': {
'task': 'tasks.generate_report',
'schedule': crontab(hour=8, minute=30, day_of_week='1-5'), # Weekdays 8:30
},
'health-check-every-5-min': {
'task': 'tasks.health_check',
'schedule': 300.0, # Every 5 minutes (in seconds)
},
}
@app.task
def cleanup_sessions():
print("Cleaning up expired sessions...")
# Database cleanup logic
return "Cleaned 150 expired sessions"
# Start beat: celery -A tasks beat --loglevel=infoExpected output (from beat logs):
celery beat v5.4.0 is starting.
__ - ... __ - _
LocalTime -> 2026-06-20 03:00:00
Configuration ->
. broker -> redis://localhost:6379/0
. schedule -> crontab(hour=3, minute=0)
. tasks:
. cleanup-expired-sessions: crontab(hour=3, minute=0)
. generate-daily-report: crontab(hour=8, minute=30, day_of_week='1-5')
. health-check-every-5-min: 300.0s
[2026-06-20 03:00:00: INFO/Beat] Scheduler: Sending due task cleanup-expired-sessions
[2026-06-20 03:00:00: INFO/MainProcess] Task tasks.cleanup_sessions succeededMonitoring with Flower
# Start Flower dashboard
celery -A tasks flower --port=5555 --broker=redis://localhost:6379/0Expected output:
[I 2026-06-20 10:00:00] Starting Flower web server on http://0.0.0.0:5555
[I 2026-06-20 10:00:01] Visit me at http://localhost:5555Flower provides a web dashboard at http://localhost:5555 showing worker status, task history, queue lengths, and the ability to revoke or retry tasks.
Error Handling and Retries
# error_handling.py
# Robust task with retry logic and dead-letter handling
@app.task(bind=True, max_retries=5, default_retry_delay=30, acks_late=True)
def process_payment(self, order_id, amount):
"""Process payment with exponential backoff retry."""
try:
# Simulate flaky payment gateway
import random
if random.random() < 0.3: # 30% failure rate
raise ConnectionError("Payment gateway timeout")
print(f"Payment {order_id}: ${amount} processed successfully")
return {"order_id": order_id, "status": "completed", "amount": amount}
except ConnectionError as exc:
# Exponential backoff: 30s, 60s, 120s, 240s, 480s
countdown = 30 * (2 ** self.request.retries)
print(f"Retry {self.request.retries + 1}/{self.max_retries} "
f"for order {order_id} in {countdown}s")
raise self.retry(exc=exc, countdown=countdown)
except Exception as exc:
# Non-retriable error — log and acknowledge
print(f"Fatal error processing payment {order_id}: {exc}")
# In production: send to dead-letter queue or alert
raise
# Task expiry and soft time limits
@app.task(time_limit=30, soft_time_limit=25, acks_late=True)
def process_large_file(file_path):
"""Process a large file with timeout protection."""
import time
start = time.time()
# Simulate processing
time.sleep(10)
elapsed = time.time() - start
print(f"Processed {file_path} in {elapsed:.2f}s")
return f"Completed: {file_path}"Common Errors
1. Forgetting to Start the Worker
Calling task.delay() without a running worker queues the task but nothing executes. The task stays in PENDING forever.
2. Using Pickle Serializer in Production
The default serializer in older versions was pickle, which is insecure. Always set task_serializer='json' to prevent remote code execution.
3. Not Setting Task Time Limits
Tasks that hang due to network issues or infinite loops consume workers indefinitely. Always set soft_time_limit and time_limit.
4. Ignoring the Result Backend
Without a result backend (backend='redis://...'), result.get() raises NotImplementedError. Results are stored temporarily — use persistent storage for important data.
5. RabbitMQ Broker with Auto-Expiring Queues
If you use RabbitMQ as a broker and don’t configure queue expiry, idle queues accumulate. Set x-expires on queues or use Redis for simpler setups.
6. Prefetch Count Misconfiguration
With high worker_prefetch_multiplier, one worker grabs all tasks, starving others. Set worker_prefetch_multiplier=1 for fair distribution across workers.
Practice Questions
1. What is the difference between a Celery broker and a result backend?
The broker transports task messages from producers to workers (e.g., Redis, RabbitMQ). The result backend stores task results so they can be retrieved later with result.get().
2. How do you call a task asynchronously vs synchronously?
Async: task.delay(arg1, arg2) returns immediately with an AsyncResult. Sync: task(arg1, arg2) calls the function directly in the current process (no worker needed).
3. What does acks_late=True do?
It tells the worker to acknowledge the task message after the task completes. If the worker crashes, the task is redelivered to another worker. Prevents task loss on worker failure.
4. How do you schedule a task to run every weekday at 9 AM?
Use crontab(hour=9, minute=0, day_of_week='1-5') in your beat_schedule.
5. Challenge: Build an order processing pipeline with Celery that: (1) validates the order, (2) charges the payment gateway (with 3 retries), (3) sends a confirmation email, (4) updates inventory. Use chaining for sequential steps and error handling at each stage.
Implement with chain(validate_order.s(), charge_payment.s(), send_confirmation.s(), update_inventory.s()). Use bind=True on each task to access self.retry(). Add a fallback task for failed payments that notifies the support team.
Mini Project: Task Queue Monitor
# monitor.py
# Simple Celery task monitor using Redis
import time
from datetime import datetime
from tasks import app
def monitor_queues():
"""Monitor Celery queue lengths in real time."""
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
print("═" * 50)
print("Celery Queue Monitor — Press Ctrl+C to exit")
print("═" * 50)
try:
while True:
# Get queue lengths
celery_queue = r.llen('celery')
scheduled = len(r.zrange('celery', 0, -1))
# Get worker stats
inspector = app.control.inspect()
active = inspector.active() or {}
reserved = inspector.reserved() or {}
worker_count = len(active)
total_active = sum(len(tasks) for tasks in active.values())
total_reserved = sum(len(tasks) for tasks in reserved.values())
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] "
f"Workers: {worker_count} | "
f"Queued: {celery_queue} | "
f"Active: {total_active} | "
f"Reserved: {total_reserved} | "
f"Scheduled: {scheduled}")
time.sleep(5)
except KeyboardInterrupt:
print("\nMonitoring stopped.")
if __name__ == '__main__':
monitor_queues()Expected output:
══════════════════════════════════════════════════
Celery Queue Monitor — Press Ctrl+C to exit
══════════════════════════════════════════════════
[10:00:00] Workers: 2 | Queued: 0 | Active: 3 | Reserved: 1 | Scheduled: 0
[10:00:05] Workers: 2 | Queued: 0 | Active: 2 | Reserved: 0 | Scheduled: 0
[10:00:10] Workers: 2 | Queued: 0 | Active: 0 | Reserved: 0 | Scheduled: 0FAQ
Related Concepts
What’s Next
You now understand Celery fundamentals! Next, learn about RabbitMQ as a message broker to supercharge your task queues, then explore background job patterns for different async processing approaches.
- Practice daily — Convert a slow endpoint in your app to use Celery
- Build a project — Build an order processing system with chained tasks and retries
- Explore related topics — Check out task prioritization, routing, and canvas primitives (group, chord, map)
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro. Updated 2026-06-20.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro