Data Pipeline Orchestration — Airflow, Prefect, and Dagster Guide
Data pipeline orchestration is the automated coordination, scheduling, and monitoring of data workflows — ensuring tasks run in the right order, retry on failure, and alert when something breaks.
What You’ll Learn
By the end of this tutorial, you’ll understand how Apache Airflow, Prefect, and Dagster orchestrate data pipelines, build a production DAG with quality checks, and set up CI/CD and monitoring for data workflows.
Why Orchestration Matters
Running a data pipeline with cron jobs and shell scripts works until it doesn’t. A single task fails, downstream jobs process stale data, and nobody notices for 12 hours. Orchestration platforms give you dependency management, automatic retries, monitoring, and observability. Doda Browser uses Prefect to orchestrate 200+ daily data pipelines that power its analytics dashboards.
Orchestration Learning Path
flowchart LR
A[ETL Pipelines] --> B[Data Pipeline Orchestration]
B --> C{You Are Here}
C --> D[Apache Airflow]
C --> E[Prefect]
C --> F[Dagster]
D --> G[DAGs & Operators]
E --> H[Flows & Tasks]
F --> I[Assets & Ops]
What Is Data Pipeline Orchestration?
Imagine you’re cooking a three-course meal. You can’t serve dessert before the main course. You need to check if the oven is preheated before putting the roast in. If the roast burns, you need to start over — but the salad can be prepared in parallel. Orchestration is the kitchen manager who coordinates every step, monitors timing, and handles emergencies.
In data terms, orchestration platforms define workflows (DAGs, flows, graphs) where each task has dependencies, retries, and conditions. The scheduler triggers these workflows on a schedule or event.
Apache Airflow Deep Dive
Airflow is the most established orchestrator. Its core unit is the DAG (Directed Acyclic Graph): a collection of tasks with defined dependencies.
Airflow DAG with Data Quality
# quality_etl_dag.py
# Airflow DAG with data quality checks
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
default_args = {
'owner': 'dodatech',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'quality_etl_pipeline',
default_args=default_args,
description='ETL with data quality checks and branching',
schedule_interval='0 3 * * *',
catchup=False,
tags=['quality', 'etl'],
)
def extract_orders(**context):
import json
# Simulated extraction
data = {
"orders": [
{"id": 1, "amount": 100.0, "date": "2026-06-01"},
{"id": 2, "amount": 250.0, "date": "2026-06-01"},
{"id": 3, "amount": -50.0, "date": "2026-06-02"}, # invalid
]
}
context['ti'].xcom_push(key='orders', value=data)
print(f"Extracted {len(data['orders'])} orders")
def check_data_quality(**context):
ti = context['ti']
data = ti.xcom_pull(key='orders')
errors = []
for order in data['orders']:
if order['amount'] <= 0:
errors.append(f"Order {order['id']}: negative amount {order['amount']}")
if errors:
ti.xcom_push(key='quality_errors', value=errors)
ti.xcom_push(key='quality_status', value='failed')
print(f"Quality check FAILED: {errors}")
return 'alert_team'
else:
ti.xcom_push(key='quality_status', value='passed')
print("Quality check PASSED")
return 'transform_orders'
def alert_team(**context):
ti = context['ti']
errors = ti.xcom_pull(key='quality_errors')
print(f"ALERT: Sending notification for {len(errors)} errors")
for err in errors:
print(f" - {err}")
def transform_orders(**context):
ti = context['ti']
data = ti.xcom_pull(key='orders')
transformed = [
{**o, "category": "high" if o["amount"] > 200 else "standard"}
for o in data["orders"] if o["amount"] > 0
]
ti.xcom_push(key='transformed', value=transformed)
print(f"Transformed {len(transformed)} valid orders")
# Task definitions
start = DummyOperator(task_id='start', dag=dag)
extract = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
dag=dag,
)
quality_check = BranchPythonOperator(
task_id='check_data_quality',
python_callable=check_data_quality,
dag=dag,
)
alert = PythonOperator(
task_id='alert_team',
python_callable=alert_team,
dag=dag,
)
transform = PythonOperator(
task_id='transform_orders',
python_callable=transform_orders,
dag=dag,
)
load = BashOperator(
task_id='load_to_warehouse',
bash_command='echo "Loading {{ ti.xcom_pull(key=\'transformed\') | length }} records"',
dag=dag,
)
end = DummyOperator(task_id='end', dag=dag)
# Dependencies with branching
start >> extract >> quality_check
quality_check >> alert >> end
quality_check >> transform >> load >> endAirflow Sensors
Sensors wait for a condition before proceeding. Use poke or reschedule mode:
# sensor_example.py
# Airflow sensor implementation
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import os
class FileExistenceSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, *args, **kwargs):
super().__init__(*args, **kwargs)
self.filepath = filepath
def poke(self, context):
exists = os.path.exists(self.filepath)
if exists:
self.log.info(f"File {self.filepath} exists!")
else:
self.log.info(f"File {self.filepath} not found, waiting...")
return exists
# Usage in DAG:
# wait_for_file = FileExistenceSensor(
# task_id='wait_for_data_file',
# filepath='/data/input/orders_20260601.csv',
# poke_interval=60,
# timeout=3600,
# mode='reschedule',
# dag=dag,
# )Prefect Overview
Prefect uses flows (Python functions with @flow decorator) and tasks (@task). It handles retries, caching, and state management automatically.
# prefect_flow.py
# Prefect flow with retries and caching
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import httpx
import time
@task(retries=3, retry_delay_seconds=30, cache_key_fn=None)
def fetch_weather_data(city: str):
"""Fetch weather data from an API with retries."""
response = httpx.get(f"https://api.weather.com/v1/{city}", timeout=10)
response.raise_for_status()
data = response.json()
print(f"Fetched {len(data)} records for {city}")
return data
@task
def process_weather_data(data: dict):
"""Transform raw weather data."""
processed = {
"city": data.get("name"),
"temp_c": data.get("main", {}).get("temp"),
"humidity": data.get("main", {}).get("humidity"),
"timestamp": time.time(),
}
print(f"Processed: {processed['city']} - {processed['temp_c']}C")
return processed
@task
def validate_temperature(processed: dict):
"""Data quality: check temperature range."""
if processed["temp_c"] is None:
raise ValueError("Temperature is missing")
if processed["temp_c"] < -50 or processed["temp_c"] > 60:
raise ValueError(f"Temperature {processed['temp_c']}C out of range")
print(f"Validation passed for {processed['city']}")
return processed
@task
def store_in_database(processed: dict):
"""Simulate database storage."""
print(f"INSERT INTO weather VALUES ('{processed['city']}', {processed['temp_c']}, {processed['humidity']})")
return {"status": "stored", "city": processed["city"]}
@flow(name="weather_pipeline", task_runner=ConcurrentTaskRunner())
def weather_pipeline(cities: list):
"""Orchestrate the full weather data pipeline."""
results = []
for city in cities:
raw = fetch_weather_data(city)
processed = process_weather_data(raw)
validated = validate_temperature(processed)
stored = store_in_database(validated)
results.append(stored)
return results
# Run the flow
if __name__ == "__main__":
cities = ["London", "Tokyo", "New York"]
results = weather_pipeline(cities)
print(f"\nPipeline complete. {len(results)} cities processed.")
for r in results:
print(f" {r['city']}: {r['status']}")Expected output:
Fetched 3 records for London
Processed: London - 15.0C
Validation passed for London
INSERT INTO weather VALUES ('London', 15.0, 72)
Fetched 3 records for Tokyo
Processed: Tokyo - 22.0C
Validation passed for Tokyo
INSERT INTO weather VALUES ('Tokyo', 22.0, 65)
Fetched 3 records for New York
Processed: New York - 10.0C
Validation passed for New York
INSERT INTO weather VALUES ('New York', 10.0, 80)
Pipeline complete. 3 cities processed.
London: stored
Tokyo: stored
New York: storedDagster Overview
Dagster centers on software-defined assets — data assets that know how to produce themselves. Each asset declares its dependencies and computes its value.
# dagster_pipeline.py
# Dagster software-defined assets pipeline
from dagster import asset, Definitions, Out, AssetIn
import pandas as pd
@asset
def raw_orders() -> pd.DataFrame:
"""Extract raw orders data."""
data = pd.DataFrame([
{"order_id": 1, "amount": 100.0, "status": "completed", "date": "2026-06-01"},
{"order_id": 2, "amount": 250.0, "status": "completed", "date": "2026-06-01"},
{"order_id": 3, "amount": -50.0, "status": "cancelled", "date": "2026-06-02"},
])
print(f"Extracted {len(data)} orders")
return data
@asset
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
"""Remove invalid orders with negative amounts."""
before = len(raw_orders)
cleaned = raw_orders[raw_orders["amount"] > 0].copy()
removed = before - len(cleaned)
print(f"Cleaned: removed {removed} invalid orders")
return cleaned
@asset
def order_summary(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
"""Compute daily order summary."""
cleaned_orders["order_date"] = pd.to_datetime(cleaned_orders["date"])
summary = cleaned_orders.groupby("order_date").agg(
total_orders=("order_id", "count"),
total_revenue=("amount", "sum"),
).reset_index()
print("Order summary computed:")
print(summary.to_string())
return summary
# Definitions for Dagster UI
defs = Definitions(assets=[raw_orders, cleaned_orders, order_summary])Orchestration Platform Comparison
| Feature | Airflow | Prefect | Dagster |
|---|---|---|---|
| Core concept | DAGs + Operators | Flows + Tasks | Assets + Ops |
| Python-native | Operators in Python | @flow / @task decorators | @asset / @op decorators |
| Scheduler | Built-in (Celery/K8s) | Prefect Cloud / Server | Built-in (Dagit/Cloud) |
| Retries | retries param | retries decorator | retry_policy on ops |
| Data quality | Custom checks | Built-in blocks | Asset checks |
| Monitoring | Airflow UI + logs | Prefect UI | Dagster UI (Dagit) |
| Scalability | Production-proven | Growing | Growing |
CI/CD for Data Pipelines
Data pipelines need version control, testing, and deployment automation just like application code:
# .github/workflows/data-pipeline-ci.yml
name: Data Pipeline CI/CD
on:
push:
branches: [main]
paths:
- 'dags/**'
- 'flows/**'
- 'assets/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: |
python -m pytest tests/ --cov=dags --cov-report=xml
- name: Validate DAGs
run: |
python -c "from airflow.models import DAG; exec(open('dags/quality_etl_dag.py').read()); print('DAG validation passed')"
- name: Lint
run: |
pip install ruff
ruff check dags/ --max-line-length=100
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Deploy DAGs to Airflow
run: |
rsync -avz dags/ airflow-server:/opt/airflow/dags/Monitoring and Alerting
# monitoring.py
# Airflow callback for Slack alerts
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
def task_failure_alert(context):
"""Send Slack alert on task failure."""
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
execution_date = context['execution_date']
log_url = context['task_instance'].log_url
message = f"""
🔴 Pipeline Failed
DAG: {dag_id}
Task: {task_id}
Execution: {execution_date}
Logs: {log_url}
"""
hook = SlackWebhookHook(
webhook_token='T_TOKEN/B_TOKEN/xxxxx'
)
hook.send(text=message)
print(f"Alert sent for {dag_id}.{task_id}")
# Add to DAG default_args:
# 'on_failure_callback': task_failure_alert,Orchestration Architecture
flowchart TB
subgraph "Orchestration Layer"
S[Scheduler] --> Q[Queue / Message Broker]
Q --> W1[Worker 1]
Q --> W2[Worker 2]
Q --> W3[Worker N]
end
subgraph "Data Sources"
API[REST APIs]
DB[(Databases)]
FS[File System / S3]
end
subgraph "Transforms"
SP[Apache Spark]
DBT[dbt]
PY[Python Scripts]
end
subgraph "Destinations"
WH[(Data Warehouse)]
DL[(Data Lake)]
DASH[Dashboards]
end
S --> API
S --> DB
S --> FS
W1 --> SP
W2 --> DBT
W3 --> PY
SP --> WH
DBT --> DL
PY --> DASH
MK[MetaDB
PostgreSQL/MySQL] --- S
MK --- W1
Common Orchestration Mistakes
1. Hard-Coding Connections and Secrets
Never put database passwords or API keys in DAG/flow code. Use Airflow Connections, Prefect Blocks, or environment variables. Leaked credentials in a GitHub repo are the #1 data breach cause.
2. Not Setting catchup=False in Airflow
Without catchup=False, a new DAG with start_date=2020-01-01 triggers thousands of backfill runs. Always set catchup=False unless backfill is intentional.
3. Tasks That Are Too Large
A single task that runs for 6 hours is impossible to debug and hard to retry. Break long tasks into smaller steps — extract, validate, transform, load — each retriable independently.
4. Ignoring Idempotency
Running a pipeline twice should produce the same result. Use upserts (MERGE INTO) instead of inserts, and partition by date so re-runs overwrite existing partitions.
5. No Monitoring or Alerting
Without alerts, a broken pipeline runs silently for hours. Set on_failure_callback, Slack/email alerts, and track metrics like task duration, success rate, and queue depth.
6. Overcomplicating Dependencies
A DAG with 50 tasks and complex branching is hard to maintain. Group related steps into TaskGroups (Airflow) or subflows (Prefect). Prefer linear pipelines where possible.
7. Testing Only in Production
Data pipelines are hard to test locally, but not testing at all guarantees production failures. Use Airflow’s DAG.test(), Prefect’s flow.serve(), and GitHub Actions to validate DAGs on every commit.
Practice Questions
1. What is a DAG in Airflow and why must it be acyclic?
A Directed Acyclic Graph defines task execution order. “Directed” means tasks flow in one direction; “Acyclic” means no loops. Cycles would cause infinite execution loops that never complete.
2. How does Prefect differ from Airflow in defining workflows?
Airflow uses Python classes (DAG, operators) while Prefect uses decorators (@flow, @task). Prefect flows are pure Python functions with automatic state management, making them easier to test locally.
3. What are Airflow sensors and when would you use one?
Sensors are operators that wait for a condition — file arrival, API availability, database record. Use them when a downstream task depends on something external that may arrive asynchronously.
4. How does Dagster’s asset-based approach differ from Airflow’s DAG approach?
Dagster treats data as assets that know how to produce themselves. Airflow focuses on task execution order. Dagster automatically tracks data lineage and materializations; Airflow requires explicit XCom or external tracking.
5. Challenge: Build a pipeline that ingests hourly CSV files from S3, validates schema, transforms with dbt, and loads to Snowflake, with Slack alerts on failure and a dashboard that tracks pipeline health.
Use Airflow with S3KeySensor to wait for new files, PythonOperator for schema validation, BashOperator for dbt run, SnowflakeOperator for loading. Add on_failure_callback to Slack. Export metrics (task duration, row counts) to Prometheus for Grafana dashboard.
Mini Project: Custom Prefect Block
# custom_block.py
# Custom Prefect block for API polling
from prefect.blocks.core import Block
from pydantic import Field
import httpx
import time
class ApiPoller(Block):
"""Block that polls an API endpoint until data is ready."""
endpoint: str = Field(description="API endpoint URL")
poll_interval: int = Field(default=30, description="Seconds between polls")
timeout: int = Field(default=600, description="Max seconds to wait")
def block_initialization(self):
self.start_time = time.time()
def poll(self) -> dict:
"""Poll the API until data is available or timeout."""
while time.time() - self.start_time < self.timeout:
response = httpx.get(self.endpoint, timeout=10)
if response.status_code == 200 and response.json().get("ready"):
print(f"Data ready after {time.time() - self.start_time:.1f}s")
return response.json()
elapsed = time.time() - self.start_time
print(f"Waiting... ({elapsed:.0f}s elapsed)")
time.sleep(self.poll_interval)
raise TimeoutError(f"API not ready after {self.timeout}s")
# Usage:
# poller = ApiPoller(endpoint="https://api.example.com/status")
# result = poller.poll()
if __name__ == "__main__":
# Simulate usage
from unittest.mock import patch
poller = ApiPoller(endpoint="https://api.example.com/status")
with patch.object(httpx, 'get') as mock_get:
mock_response = mock_get.return_value
mock_response.status_code = 200
mock_response.json.side_effect = [
{"ready": False, "data": None},
{"ready": False, "data": None},
{"ready": True, "data": {"records": 100}},
]
result = poller.poll()
print(f"Polling result: {result}")Expected output:
Waiting... (0s elapsed)
Waiting... (30s elapsed)
Data ready after 60.0s
Polling result: {'ready': True, 'data': {'records': 100}}Related Concepts
FAQ
What’s Next
You now understand data pipeline orchestration across Airflow, Prefect, and Dagster. Next, explore dbt transformations for SQL-based data modeling and stream processing for real-time pipelines.
- Practice daily — Convert a cron-based ETL to an Airflow DAG
- Build a project — Deploy a Prefect flow on Prefect Cloud that ingests API data daily
- Explore alternatives — Compare orchestration costs and features for your specific workload
Remember: every expert was once a beginner. Keep orchestrating!
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro