Skip to content
Celery: Distributed Task Queue for Python — Complete Guide

Celery: Distributed Task Queue for Python — Complete Guide

DodaTech Updated Jun 20, 2026 9 min read

Celery is a distributed task queue for Python that executes asynchronous tasks outside the HTTP request-response cycle, enabling background processing, scheduled jobs, and real-time operations at scale.

What You’ll Learn

By the end of this tutorial, you’ll understand Celery’s architecture — brokers, workers, result backends — and build production-grade async task pipelines with scheduling, monitoring, and error handling.

Why Celery Matters

Web applications must respond quickly. A request that sends emails, processes images, or calls third-party APIs blocks the user for seconds. Celery moves these tasks to background workers. Doda Browser uses Celery to process analytics events, generate thumbnails, and send notification emails without degrading page load times.

Celery Architecture


flowchart TB
    subgraph "Celery System"
        A[Web App
Producer] --> B[Message Broker
RabbitMQ/Redis] B --> C[Worker 1] B --> D[Worker 2] B --> E[Worker N] C --> F[Result Backend
Redis/DB] D --> F E --> F G[Celery Beat
Scheduler] --> B H[Flower
Monitor] -.-> C H -.-> D H -.-> E end style B fill:#f90,color:#fff

Installing Celery

pip install celery[redis] flower

This installs Celery with Redis support and the Flower monitoring tool.

Basic Celery Setup

# tasks.py
# Celery app configuration
from celery import Celery

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0',
    include=['tasks'],
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

Defining Your First Task

# tasks.py (continued)
# A simple Celery task

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_email, username):
    """Send a welcome email to a new user."""
    try:
        # Simulate email sending
        print(f"Sending welcome email to {user_email}...")
        # import requests
        # response = requests.post(
        #     "https://api.sendgrid.com/v3/mail/send",
        #     json={"to": user_email, "template": "welcome"},
        #     headers={"Authorization": "Bearer SG_API_KEY"}
        # )
        # response.raise_for_status()
        print(f"Welcome email sent to {username} at {user_email}")
        return {"status": "sent", "email": user_email}
    except Exception as exc:
        print(f"Failed to send email: {exc}")
        raise self.retry(exc=exc)

@app.task
def process_image(image_path):
    """Generate thumbnails for an uploaded image."""
    import time
    time.sleep(2)  # Simulate processing
    print(f"Processed image: {image_path}")
    return {"image": image_path, "thumbnail": image_path.replace('.jpg', '_thumb.jpg')}

Running the Worker

celery -A tasks worker --loglevel=info

Expected output:

 -------------- celery@hostname v5.4.0 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-6.8.0-x86_64-with-glibc2.35 2026-06-20 10:00:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f8a1c2b3d00
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.send_welcome_email
  . tasks.process_image

[2026-06-20 10:00:01: INFO/MainProcess] Connected to redis://localhost:6379/0
[2026-06-20 10:00:01: INFO/MainProcess] mingle: searching for neighbors
[2026-06-20 10:00:02: INFO/MainProcess] mingle: all alone
[2026-06-20 10:00:02: INFO/MainProcess] ready.

Calling Tasks

# call_tasks.py
# Demonstrates calling Celery tasks synchronously and asynchronously
from tasks import send_welcome_email, process_image

# Async call — returns immediately
result = send_welcome_email.delay('user@example.com', 'Alice')
print(f"Task ID: {result.id}")
print(f"Task status: {result.status}")

# Wait for result (blocking)
print(f"Result: {result.get(timeout=10)}")

# Async call with arguments
image_result = process_image.delay('/uploads/photo.jpg')
print(f"Image task ID: {image_result.id}")

# Check status without blocking
from celery.result import AsyncResult
status = AsyncResult(image_result.id, app=send_welcome_email.app)
print(f"Pending: {status.status}")  # PENDING, STARTED, SUCCESS, FAILURE

Expected output:

Task ID: 550e8400-e29b-41d4-a716-446655440000
Task status: PENDING
Sending welcome email to user@example.com...
Welcome email sent to Alice at user@example.com
Result: {'status': 'sent', 'email': 'user@example.com'}
Image task ID: 660e8400-e29b-41d4-a716-446655440001
Pending: PENDING

Task Chaining

# chaining.py
# Chain tasks so one runs after another with the previous result as input
from celery import chain, group, chord
from tasks import app

@app.task
def add(x, y):
    result = x + y
    print(f"{x} + {y} = {result}")
    return result

@app.task
def multiply(x, y):
    result = x * y
    print(f"{x} * {y} = {result}")
    return result

@app.task
def format_result(value):
    formatted = f"Final result: {value}"
    print(formatted)
    return formatted

# Chain: add(2, 2) → multiply(4, 10) → format_result("Final result: 40")
task_chain = chain(
    add.s(2, 2),
    multiply.s(10),
    format_result.s()
)
result = task_chain()
print(f"Chain result: {result.get()}")

# Group: run tasks in parallel
parallel = group(
    add.s(1, 2),
    add.s(3, 4),
    add.s(5, 6),
)
group_result = parallel()
print(f"Group results: {group_result.get()}")

Expected output:

2 + 2 = 4
4 * 10 = 40
Final result: 40
Chain result: Final result: 40
1 + 2 = 3
3 + 4 = 7
5 + 6 = 11
Group results: [3, 7, 11]

Celery Beat — Scheduled Tasks

# celerybeat_schedule.py
# Add to your Celery app configuration for periodic tasks
from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-expired-sessions': {
        'task': 'tasks.cleanup_sessions',
        'schedule': crontab(hour=3, minute=0),  # Daily at 3 AM
    },
    'generate-daily-report': {
        'task': 'tasks.generate_report',
        'schedule': crontab(hour=8, minute=30, day_of_week='1-5'),  # Weekdays 8:30
    },
    'health-check-every-5-min': {
        'task': 'tasks.health_check',
        'schedule': 300.0,  # Every 5 minutes (in seconds)
    },
}

@app.task
def cleanup_sessions():
    print("Cleaning up expired sessions...")
    # Database cleanup logic
    return "Cleaned 150 expired sessions"

# Start beat: celery -A tasks beat --loglevel=info

Expected output (from beat logs):

celery beat v5.4.0 is starting.
__    -    ... __   -        _
LocalTime -> 2026-06-20 03:00:00
Configuration ->
    . broker -> redis://localhost:6379/0
    . schedule -> crontab(hour=3, minute=0)
    . tasks:
      . cleanup-expired-sessions: crontab(hour=3, minute=0)
      . generate-daily-report: crontab(hour=8, minute=30, day_of_week='1-5')
      . health-check-every-5-min: 300.0s

[2026-06-20 03:00:00: INFO/Beat] Scheduler: Sending due task cleanup-expired-sessions
[2026-06-20 03:00:00: INFO/MainProcess] Task tasks.cleanup_sessions succeeded

Monitoring with Flower

# Start Flower dashboard
celery -A tasks flower --port=5555 --broker=redis://localhost:6379/0

Expected output:

[I 2026-06-20 10:00:00] Starting Flower web server on http://0.0.0.0:5555
[I 2026-06-20 10:00:01] Visit me at http://localhost:5555

Flower provides a web dashboard at http://localhost:5555 showing worker status, task history, queue lengths, and the ability to revoke or retry tasks.

Error Handling and Retries

# error_handling.py
# Robust task with retry logic and dead-letter handling

@app.task(bind=True, max_retries=5, default_retry_delay=30, acks_late=True)
def process_payment(self, order_id, amount):
    """Process payment with exponential backoff retry."""
    try:
        # Simulate flaky payment gateway
        import random
        if random.random() < 0.3:  # 30% failure rate
            raise ConnectionError("Payment gateway timeout")

        print(f"Payment {order_id}: ${amount} processed successfully")
        return {"order_id": order_id, "status": "completed", "amount": amount}

    except ConnectionError as exc:
        # Exponential backoff: 30s, 60s, 120s, 240s, 480s
        countdown = 30 * (2 ** self.request.retries)
        print(f"Retry {self.request.retries + 1}/{self.max_retries} "
              f"for order {order_id} in {countdown}s")
        raise self.retry(exc=exc, countdown=countdown)

    except Exception as exc:
        # Non-retriable error — log and acknowledge
        print(f"Fatal error processing payment {order_id}: {exc}")
        # In production: send to dead-letter queue or alert
        raise

# Task expiry and soft time limits
@app.task(time_limit=30, soft_time_limit=25, acks_late=True)
def process_large_file(file_path):
    """Process a large file with timeout protection."""
    import time
    start = time.time()
    # Simulate processing
    time.sleep(10)
    elapsed = time.time() - start
    print(f"Processed {file_path} in {elapsed:.2f}s")
    return f"Completed: {file_path}"

Common Errors

1. Forgetting to Start the Worker

Calling task.delay() without a running worker queues the task but nothing executes. The task stays in PENDING forever.

2. Using Pickle Serializer in Production

The default serializer in older versions was pickle, which is insecure. Always set task_serializer='json' to prevent remote code execution.

3. Not Setting Task Time Limits

Tasks that hang due to network issues or infinite loops consume workers indefinitely. Always set soft_time_limit and time_limit.

4. Ignoring the Result Backend

Without a result backend (backend='redis://...'), result.get() raises NotImplementedError. Results are stored temporarily — use persistent storage for important data.

5. RabbitMQ Broker with Auto-Expiring Queues

If you use RabbitMQ as a broker and don’t configure queue expiry, idle queues accumulate. Set x-expires on queues or use Redis for simpler setups.

6. Prefetch Count Misconfiguration

With high worker_prefetch_multiplier, one worker grabs all tasks, starving others. Set worker_prefetch_multiplier=1 for fair distribution across workers.

Practice Questions

1. What is the difference between a Celery broker and a result backend?

The broker transports task messages from producers to workers (e.g., Redis, RabbitMQ). The result backend stores task results so they can be retrieved later with result.get().

2. How do you call a task asynchronously vs synchronously?

Async: task.delay(arg1, arg2) returns immediately with an AsyncResult. Sync: task(arg1, arg2) calls the function directly in the current process (no worker needed).

3. What does acks_late=True do?

It tells the worker to acknowledge the task message after the task completes. If the worker crashes, the task is redelivered to another worker. Prevents task loss on worker failure.

4. How do you schedule a task to run every weekday at 9 AM?

Use crontab(hour=9, minute=0, day_of_week='1-5') in your beat_schedule.

5. Challenge: Build an order processing pipeline with Celery that: (1) validates the order, (2) charges the payment gateway (with 3 retries), (3) sends a confirmation email, (4) updates inventory. Use chaining for sequential steps and error handling at each stage.

Implement with chain(validate_order.s(), charge_payment.s(), send_confirmation.s(), update_inventory.s()). Use bind=True on each task to access self.retry(). Add a fallback task for failed payments that notifies the support team.

Mini Project: Task Queue Monitor

# monitor.py
# Simple Celery task monitor using Redis
import time
from datetime import datetime
from tasks import app

def monitor_queues():
    """Monitor Celery queue lengths in real time."""
    import redis
    r = redis.Redis(host='localhost', port=6379, db=0)

    print("═" * 50)
    print("Celery Queue Monitor — Press Ctrl+C to exit")
    print("═" * 50)

    try:
        while True:
            # Get queue lengths
            celery_queue = r.llen('celery')
            scheduled = len(r.zrange('celery', 0, -1))

            # Get worker stats
            inspector = app.control.inspect()
            active = inspector.active() or {}
            reserved = inspector.reserved() or {}
            worker_count = len(active)

            total_active = sum(len(tasks) for tasks in active.values())
            total_reserved = sum(len(tasks) for tasks in reserved.values())

            print(f"\n[{datetime.now().strftime('%H:%M:%S')}] "
                  f"Workers: {worker_count} | "
                  f"Queued: {celery_queue} | "
                  f"Active: {total_active} | "
                  f"Reserved: {total_reserved} | "
                  f"Scheduled: {scheduled}")

            time.sleep(5)
    except KeyboardInterrupt:
        print("\nMonitoring stopped.")

if __name__ == '__main__':
    monitor_queues()

Expected output:

══════════════════════════════════════════════════
Celery Queue Monitor — Press Ctrl+C to exit
══════════════════════════════════════════════════

[10:00:00] Workers: 2 | Queued: 0 | Active: 3 | Reserved: 1 | Scheduled: 0
[10:00:05] Workers: 2 | Queued: 0 | Active: 2 | Reserved: 0 | Scheduled: 0
[10:00:10] Workers: 2 | Queued: 0 | Active: 0 | Reserved: 0 | Scheduled: 0

FAQ

What is the difference between Celery and RQ?
RQ (Redis Queue) is simpler and Redis-only. Celery supports multiple brokers (RabbitMQ, Redis, SQS), advanced routing, periodic scheduling with beat, and more monitoring options. Use RQ for simple projects, Celery for production systems.
Can Celery run tasks in a specific order?
Yes. Use chains (chain(task1.s(), task2.s())) for sequential execution, groups (group(task1.s(), task2.s())) for parallel execution, and chords (chord(group(...), callback.s())) for parallel-with-callback patterns.
How do I monitor Celery in production?
Use Flower for real-time monitoring, Celery’s built-in events (celery -E), and integrate with Prometheus/Grafana via celery-exporter. Log task results to your logging system.
Does Celery guarantee exactly-once execution?
No, Celery guarantees at-least-once delivery when acks_late=True. Combine with idempotent task design and a deduplication mechanism for exactly-once semantics.
What happens if the worker crashes mid-task?
With acks_late=True, the task is redelivered to another worker. Without it, the task is lost. Always use acks_late=True for critical tasks.

Related Concepts

What’s Next

You now understand Celery fundamentals! Next, learn about RabbitMQ as a message broker to supercharge your task queues, then explore background job patterns for different async processing approaches.

  • Practice daily — Convert a slow endpoint in your app to use Celery
  • Build a project — Build an order processing system with chained tasks and retries
  • Explore related topics — Check out task prioritization, routing, and canvas primitives (group, chord, map)

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro. Updated 2026-06-20.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro