Skip to content
Data Pipeline Orchestration — Airflow, Prefect, and Dagster Guide

Data Pipeline Orchestration — Airflow, Prefect, and Dagster Guide

DodaTech Updated Jun 20, 2026 12 min read

Data pipeline orchestration is the automated coordination, scheduling, and monitoring of data workflows — ensuring tasks run in the right order, retry on failure, and alert when something breaks.

What You’ll Learn

By the end of this tutorial, you’ll understand how Apache Airflow, Prefect, and Dagster orchestrate data pipelines, build a production DAG with quality checks, and set up CI/CD and monitoring for data workflows.

Why Orchestration Matters

Running a data pipeline with cron jobs and shell scripts works until it doesn’t. A single task fails, downstream jobs process stale data, and nobody notices for 12 hours. Orchestration platforms give you dependency management, automatic retries, monitoring, and observability. Doda Browser uses Prefect to orchestrate 200+ daily data pipelines that power its analytics dashboards.

Orchestration Learning Path


flowchart LR
  A[ETL Pipelines] --> B[Data Pipeline Orchestration]
  B --> C{You Are Here}
  C --> D[Apache Airflow]
  C --> E[Prefect]
  C --> F[Dagster]
  D --> G[DAGs & Operators]
  E --> H[Flows & Tasks]
  F --> I[Assets & Ops]

Prerequisites: Python fundamentals, ETL pipeline concepts, basic CI/CD knowledge.

What Is Data Pipeline Orchestration?

Imagine you’re cooking a three-course meal. You can’t serve dessert before the main course. You need to check if the oven is preheated before putting the roast in. If the roast burns, you need to start over — but the salad can be prepared in parallel. Orchestration is the kitchen manager who coordinates every step, monitors timing, and handles emergencies.

In data terms, orchestration platforms define workflows (DAGs, flows, graphs) where each task has dependencies, retries, and conditions. The scheduler triggers these workflows on a schedule or event.

Apache Airflow Deep Dive

Airflow is the most established orchestrator. Its core unit is the DAG (Directed Acyclic Graph): a collection of tasks with defined dependencies.

Airflow DAG with Data Quality

# quality_etl_dag.py
# Airflow DAG with data quality checks
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

default_args = {
    'owner': 'dodatech',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'quality_etl_pipeline',
    default_args=default_args,
    description='ETL with data quality checks and branching',
    schedule_interval='0 3 * * *',
    catchup=False,
    tags=['quality', 'etl'],
)

def extract_orders(**context):
    import json
    # Simulated extraction
    data = {
        "orders": [
            {"id": 1, "amount": 100.0, "date": "2026-06-01"},
            {"id": 2, "amount": 250.0, "date": "2026-06-01"},
            {"id": 3, "amount": -50.0, "date": "2026-06-02"},  # invalid
        ]
    }
    context['ti'].xcom_push(key='orders', value=data)
    print(f"Extracted {len(data['orders'])} orders")

def check_data_quality(**context):
    ti = context['ti']
    data = ti.xcom_pull(key='orders')
    errors = []
    
    for order in data['orders']:
        if order['amount'] <= 0:
            errors.append(f"Order {order['id']}: negative amount {order['amount']}")
    
    if errors:
        ti.xcom_push(key='quality_errors', value=errors)
        ti.xcom_push(key='quality_status', value='failed')
        print(f"Quality check FAILED: {errors}")
        return 'alert_team'
    else:
        ti.xcom_push(key='quality_status', value='passed')
        print("Quality check PASSED")
        return 'transform_orders'

def alert_team(**context):
    ti = context['ti']
    errors = ti.xcom_pull(key='quality_errors')
    print(f"ALERT: Sending notification for {len(errors)} errors")
    for err in errors:
        print(f"  - {err}")

def transform_orders(**context):
    ti = context['ti']
    data = ti.xcom_pull(key='orders')
    transformed = [
        {**o, "category": "high" if o["amount"] > 200 else "standard"}
        for o in data["orders"] if o["amount"] > 0
    ]
    ti.xcom_push(key='transformed', value=transformed)
    print(f"Transformed {len(transformed)} valid orders")

# Task definitions
start = DummyOperator(task_id='start', dag=dag)
extract = PythonOperator(
    task_id='extract_orders',
    python_callable=extract_orders,
    dag=dag,
)
quality_check = BranchPythonOperator(
    task_id='check_data_quality',
    python_callable=check_data_quality,
    dag=dag,
)
alert = PythonOperator(
    task_id='alert_team',
    python_callable=alert_team,
    dag=dag,
)
transform = PythonOperator(
    task_id='transform_orders',
    python_callable=transform_orders,
    dag=dag,
)
load = BashOperator(
    task_id='load_to_warehouse',
    bash_command='echo "Loading {{ ti.xcom_pull(key=\'transformed\') | length }} records"',
    dag=dag,
)
end = DummyOperator(task_id='end', dag=dag)

# Dependencies with branching
start >> extract >> quality_check
quality_check >> alert >> end
quality_check >> transform >> load >> end

Airflow Sensors

Sensors wait for a condition before proceeding. Use poke or reschedule mode:

# sensor_example.py
# Airflow sensor implementation
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import os

class FileExistenceSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, filepath, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.filepath = filepath

    def poke(self, context):
        exists = os.path.exists(self.filepath)
        if exists:
            self.log.info(f"File {self.filepath} exists!")
        else:
            self.log.info(f"File {self.filepath} not found, waiting...")
        return exists

# Usage in DAG:
# wait_for_file = FileExistenceSensor(
#     task_id='wait_for_data_file',
#     filepath='/data/input/orders_20260601.csv',
#     poke_interval=60,
#     timeout=3600,
#     mode='reschedule',
#     dag=dag,
# )

Prefect Overview

Prefect uses flows (Python functions with @flow decorator) and tasks (@task). It handles retries, caching, and state management automatically.

# prefect_flow.py
# Prefect flow with retries and caching
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import httpx
import time

@task(retries=3, retry_delay_seconds=30, cache_key_fn=None)
def fetch_weather_data(city: str):
    """Fetch weather data from an API with retries."""
    response = httpx.get(f"https://api.weather.com/v1/{city}", timeout=10)
    response.raise_for_status()
    data = response.json()
    print(f"Fetched {len(data)} records for {city}")
    return data

@task
def process_weather_data(data: dict):
    """Transform raw weather data."""
    processed = {
        "city": data.get("name"),
        "temp_c": data.get("main", {}).get("temp"),
        "humidity": data.get("main", {}).get("humidity"),
        "timestamp": time.time(),
    }
    print(f"Processed: {processed['city']} - {processed['temp_c']}C")
    return processed

@task
def validate_temperature(processed: dict):
    """Data quality: check temperature range."""
    if processed["temp_c"] is None:
        raise ValueError("Temperature is missing")
    if processed["temp_c"] < -50 or processed["temp_c"] > 60:
        raise ValueError(f"Temperature {processed['temp_c']}C out of range")
    print(f"Validation passed for {processed['city']}")
    return processed

@task
def store_in_database(processed: dict):
    """Simulate database storage."""
    print(f"INSERT INTO weather VALUES ('{processed['city']}', {processed['temp_c']}, {processed['humidity']})")
    return {"status": "stored", "city": processed["city"]}

@flow(name="weather_pipeline", task_runner=ConcurrentTaskRunner())
def weather_pipeline(cities: list):
    """Orchestrate the full weather data pipeline."""
    results = []
    for city in cities:
        raw = fetch_weather_data(city)
        processed = process_weather_data(raw)
        validated = validate_temperature(processed)
        stored = store_in_database(validated)
        results.append(stored)
    return results

# Run the flow
if __name__ == "__main__":
    cities = ["London", "Tokyo", "New York"]
    results = weather_pipeline(cities)
    print(f"\nPipeline complete. {len(results)} cities processed.")
    for r in results:
        print(f"  {r['city']}: {r['status']}")

Expected output:

Fetched 3 records for London
Processed: London - 15.0C
Validation passed for London
INSERT INTO weather VALUES ('London', 15.0, 72)
Fetched 3 records for Tokyo
Processed: Tokyo - 22.0C
Validation passed for Tokyo
INSERT INTO weather VALUES ('Tokyo', 22.0, 65)
Fetched 3 records for New York
Processed: New York - 10.0C
Validation passed for New York
INSERT INTO weather VALUES ('New York', 10.0, 80)

Pipeline complete. 3 cities processed.
  London: stored
  Tokyo: stored
  New York: stored

Dagster Overview

Dagster centers on software-defined assets — data assets that know how to produce themselves. Each asset declares its dependencies and computes its value.

# dagster_pipeline.py
# Dagster software-defined assets pipeline
from dagster import asset, Definitions, Out, AssetIn
import pandas as pd

@asset
def raw_orders() -> pd.DataFrame:
    """Extract raw orders data."""
    data = pd.DataFrame([
        {"order_id": 1, "amount": 100.0, "status": "completed", "date": "2026-06-01"},
        {"order_id": 2, "amount": 250.0, "status": "completed", "date": "2026-06-01"},
        {"order_id": 3, "amount": -50.0, "status": "cancelled", "date": "2026-06-02"},
    ])
    print(f"Extracted {len(data)} orders")
    return data

@asset
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    """Remove invalid orders with negative amounts."""
    before = len(raw_orders)
    cleaned = raw_orders[raw_orders["amount"] > 0].copy()
    removed = before - len(cleaned)
    print(f"Cleaned: removed {removed} invalid orders")
    return cleaned

@asset
def order_summary(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    """Compute daily order summary."""
    cleaned_orders["order_date"] = pd.to_datetime(cleaned_orders["date"])
    summary = cleaned_orders.groupby("order_date").agg(
        total_orders=("order_id", "count"),
        total_revenue=("amount", "sum"),
    ).reset_index()
    print("Order summary computed:")
    print(summary.to_string())
    return summary

# Definitions for Dagster UI
defs = Definitions(assets=[raw_orders, cleaned_orders, order_summary])

Orchestration Platform Comparison

FeatureAirflowPrefectDagster
Core conceptDAGs + OperatorsFlows + TasksAssets + Ops
Python-nativeOperators in Python@flow / @task decorators@asset / @op decorators
SchedulerBuilt-in (Celery/K8s)Prefect Cloud / ServerBuilt-in (Dagit/Cloud)
Retriesretries paramretries decoratorretry_policy on ops
Data qualityCustom checksBuilt-in blocksAsset checks
MonitoringAirflow UI + logsPrefect UIDagster UI (Dagit)
ScalabilityProduction-provenGrowingGrowing

CI/CD for Data Pipelines

Data pipelines need version control, testing, and deployment automation just like application code:

# .github/workflows/data-pipeline-ci.yml
name: Data Pipeline CI/CD
on:
  push:
    branches: [main]
    paths:
      - 'dags/**'
      - 'flows/**'
      - 'assets/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov
      
      - name: Run tests
        run: |
          python -m pytest tests/ --cov=dags --cov-report=xml
      
      - name: Validate DAGs
        run: |
          python -c "from airflow.models import DAG; exec(open('dags/quality_etl_dag.py').read()); print('DAG validation passed')"
      
      - name: Lint
        run: |
          pip install ruff
          ruff check dags/ --max-line-length=100

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      
      - name: Deploy DAGs to Airflow
        run: |
          rsync -avz dags/ airflow-server:/opt/airflow/dags/

Monitoring and Alerting

# monitoring.py
# Airflow callback for Slack alerts
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

def task_failure_alert(context):
    """Send Slack alert on task failure."""
    dag_id = context['dag'].dag_id
    task_id = context['task_instance'].task_id
    execution_date = context['execution_date']
    log_url = context['task_instance'].log_url
    
    message = f"""
    🔴 Pipeline Failed
    DAG: {dag_id}
    Task: {task_id}
    Execution: {execution_date}
    Logs: {log_url}
    """
    
    hook = SlackWebhookHook(
        webhook_token='T_TOKEN/B_TOKEN/xxxxx'
    )
    hook.send(text=message)
    print(f"Alert sent for {dag_id}.{task_id}")

# Add to DAG default_args:
# 'on_failure_callback': task_failure_alert,

Orchestration Architecture


flowchart TB
  subgraph "Orchestration Layer"
    S[Scheduler] --> Q[Queue / Message Broker]
    Q --> W1[Worker 1]
    Q --> W2[Worker 2]
    Q --> W3[Worker N]
  end
  subgraph "Data Sources"
    API[REST APIs]
    DB[(Databases)]
    FS[File System / S3]
  end
  subgraph "Transforms"
    SP[Apache Spark]
    DBT[dbt]
    PY[Python Scripts]
  end
  subgraph "Destinations"
    WH[(Data Warehouse)]
    DL[(Data Lake)]
    DASH[Dashboards]
  end
  S --> API
  S --> DB
  S --> FS
  W1 --> SP
  W2 --> DBT
  W3 --> PY
  SP --> WH
  DBT --> DL
  PY --> DASH
  MK[MetaDB
PostgreSQL/MySQL] --- S MK --- W1

Common Orchestration Mistakes

1. Hard-Coding Connections and Secrets

Never put database passwords or API keys in DAG/flow code. Use Airflow Connections, Prefect Blocks, or environment variables. Leaked credentials in a GitHub repo are the #1 data breach cause.

2. Not Setting catchup=False in Airflow

Without catchup=False, a new DAG with start_date=2020-01-01 triggers thousands of backfill runs. Always set catchup=False unless backfill is intentional.

3. Tasks That Are Too Large

A single task that runs for 6 hours is impossible to debug and hard to retry. Break long tasks into smaller steps — extract, validate, transform, load — each retriable independently.

4. Ignoring Idempotency

Running a pipeline twice should produce the same result. Use upserts (MERGE INTO) instead of inserts, and partition by date so re-runs overwrite existing partitions.

5. No Monitoring or Alerting

Without alerts, a broken pipeline runs silently for hours. Set on_failure_callback, Slack/email alerts, and track metrics like task duration, success rate, and queue depth.

6. Overcomplicating Dependencies

A DAG with 50 tasks and complex branching is hard to maintain. Group related steps into TaskGroups (Airflow) or subflows (Prefect). Prefer linear pipelines where possible.

7. Testing Only in Production

Data pipelines are hard to test locally, but not testing at all guarantees production failures. Use Airflow’s DAG.test(), Prefect’s flow.serve(), and GitHub Actions to validate DAGs on every commit.

Practice Questions

1. What is a DAG in Airflow and why must it be acyclic?

A Directed Acyclic Graph defines task execution order. “Directed” means tasks flow in one direction; “Acyclic” means no loops. Cycles would cause infinite execution loops that never complete.

2. How does Prefect differ from Airflow in defining workflows?

Airflow uses Python classes (DAG, operators) while Prefect uses decorators (@flow, @task). Prefect flows are pure Python functions with automatic state management, making them easier to test locally.

3. What are Airflow sensors and when would you use one?

Sensors are operators that wait for a condition — file arrival, API availability, database record. Use them when a downstream task depends on something external that may arrive asynchronously.

4. How does Dagster’s asset-based approach differ from Airflow’s DAG approach?

Dagster treats data as assets that know how to produce themselves. Airflow focuses on task execution order. Dagster automatically tracks data lineage and materializations; Airflow requires explicit XCom or external tracking.

5. Challenge: Build a pipeline that ingests hourly CSV files from S3, validates schema, transforms with dbt, and loads to Snowflake, with Slack alerts on failure and a dashboard that tracks pipeline health.

Use Airflow with S3KeySensor to wait for new files, PythonOperator for schema validation, BashOperator for dbt run, SnowflakeOperator for loading. Add on_failure_callback to Slack. Export metrics (task duration, row counts) to Prometheus for Grafana dashboard.

Mini Project: Custom Prefect Block

# custom_block.py
# Custom Prefect block for API polling
from prefect.blocks.core import Block
from pydantic import Field
import httpx
import time

class ApiPoller(Block):
    """Block that polls an API endpoint until data is ready."""
    
    endpoint: str = Field(description="API endpoint URL")
    poll_interval: int = Field(default=30, description="Seconds between polls")
    timeout: int = Field(default=600, description="Max seconds to wait")
    
    def block_initialization(self):
        self.start_time = time.time()
    
    def poll(self) -> dict:
        """Poll the API until data is available or timeout."""
        while time.time() - self.start_time < self.timeout:
            response = httpx.get(self.endpoint, timeout=10)
            if response.status_code == 200 and response.json().get("ready"):
                print(f"Data ready after {time.time() - self.start_time:.1f}s")
                return response.json()
            
            elapsed = time.time() - self.start_time
            print(f"Waiting... ({elapsed:.0f}s elapsed)")
            time.sleep(self.poll_interval)
        
        raise TimeoutError(f"API not ready after {self.timeout}s")

# Usage:
# poller = ApiPoller(endpoint="https://api.example.com/status")
# result = poller.poll()

if __name__ == "__main__":
    # Simulate usage
    from unittest.mock import patch
    
    poller = ApiPoller(endpoint="https://api.example.com/status")
    
    with patch.object(httpx, 'get') as mock_get:
        mock_response = mock_get.return_value
        mock_response.status_code = 200
        mock_response.json.side_effect = [
            {"ready": False, "data": None},
            {"ready": False, "data": None},
            {"ready": True, "data": {"records": 100}},
        ]
        
        result = poller.poll()
        print(f"Polling result: {result}")

Expected output:

Waiting... (0s elapsed)
Waiting... (30s elapsed)
Data ready after 60.0s
Polling result: {'ready': True, 'data': {'records': 100}}

Related Concepts

FAQ

What is the difference between Airflow, Prefect, and Dagster?
Airflow is the most mature with the largest ecosystem. Prefect focuses on developer experience with Pythonic decorators and cloud features. Dagster centers on software-defined assets and data lineage. Choose Airflow for complex enterprise needs, Prefect for Python-native teams, and Dagster for asset-focused pipelines.
Do I need an orchestrator for small pipelines?
For 1-2 pipelines with cron, you don’t need an orchestrator. But as soon as you have dependencies, retries, or monitoring needs (typically 3+ pipelines), an orchestrator saves time and prevents data quality issues.
How do orchestrators handle backfill?
Airflow can backfill any date range with airflow dags backfill. Prefect supports backfill through parameterized runs. Dagster materializes assets for specific partitions. Backfill re-processes historical data after fixing pipeline logic.
Can I run Airflow and Prefect together?
Yes, but it adds complexity. Use a single orchestrator per pipeline. Some teams use Airflow for batch pipelines and Prefect for ML workflows. Consider Dagster if you want a single platform for all data pipelines.

What’s Next

You now understand data pipeline orchestration across Airflow, Prefect, and Dagster. Next, explore dbt transformations for SQL-based data modeling and stream processing for real-time pipelines.

  • Practice daily — Convert a cron-based ETL to an Airflow DAG
  • Build a project — Deploy a Prefect flow on Prefect Cloud that ingests API data daily
  • Explore alternatives — Compare orchestration costs and features for your specific workload

Remember: every expert was once a beginner. Keep orchestrating!

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro