Apache Airflow Guide — DAGs, Operators, and ETL Orchestration
Apache Airflow is an open-source workflow orchestration platform that schedules, monitors, and manages complex data pipelines using directed acyclic graphs (DAGs) of tasks.
What You’ll Learn
By the end of this tutorial, you’ll understand Airflow’s core concepts — DAGs, operators, tasks, schedulers — and build a complete ETL DAG using PythonOperator and BashOperator.
Why Airflow Matters
Data pipelines have many steps: extract from APIs, validate data, transform with Spark, load into warehouses, send alerts. Without an orchestrator, you’d rely on cron jobs, manual scripts, and endless debugging. Airflow gives you scheduling, retries, monitoring, and dependency management in one platform. DodaTech uses Airflow to orchestrate Doda Browser analytics pipelines that process millions of events daily.
Apache Airflow Learning Path
flowchart LR
A[ETL Pipelines] --> B[Apache Airflow]
B --> C{You Are Here}
C --> D[DAGs]
C --> E[Operators]
C --> F[Scheduler]
D --> G[Tasks & Dependencies]
E --> H[PythonOperator]
E --> I[BashOperator]
What Is Airflow?
Think of Airflow like a factory production line manager. Each machine on the line does one job (task). The manager ensures machines start in the right order, handles breakdowns (retries), and tracks progress. If Machine 3 depends on Machine 2’s output, the manager waits until Machine 2 finishes before starting Machine 3.
A DAG is the production line blueprint. Operators are the machines. Tasks are individual jobs run on machines. The Scheduler is the manager.
Core Concepts
Directed Acyclic Graph (DAG)
A DAG defines the pipeline structure — tasks and their dependencies. “Directed” means execution flows one way. “Acyclic” means no loops — you can’t create infinite cycles.
# simple_dag.py
# Minimal Airflow DAG definition
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'dodatech',
'start_date': datetime(2026, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'simple_etl_dag',
default_args=default_args,
description='Simple ETL pipeline',
schedule_interval='@daily',
catchup=False,
tags=['etl', 'tutorial'],
)Operators
Operators define what to do. Airflow has operators for everything.
| Operator | Purpose |
|---|---|
PythonOperator | Execute Python functions |
BashOperator | Execute shell commands |
PostgresOperator | Run SQL queries on PostgreSQL |
S3FileTransformOperator | Transform files in S3 |
EmailOperator | Send emails on success/failure |
DummyOperator | No-op for grouping tasks |
Tasks and Dependencies
Tasks are instances of operators. Dependencies define order: task1 >> task2 means task2 runs after task1.
# Defining tasks and dependencies
task_extract = PythonOperator(
task_id='extract_data',
python_callable=extract_function,
dag=dag,
)
task_transform = PythonOperator(
task_id='transform_data',
python_callable=transform_function,
dag=dag,
)
task_load = BashOperator(
task_id='load_to_warehouse',
bash_command='dbt run --models sales',
dag=dag,
)
# Define execution order
task_extract >> task_transform >> task_loadComplete ETL DAG Example
# etl_dag.py
# Complete ETL DAG with PythonOperator and BashOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'dodatech',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=10),
}
dag = DAG(
'sales_etl_pipeline',
default_args=default_args,
description='Daily sales ETL: extract, transform, load',
schedule_interval='0 2 * * *', # Run daily at 2 AM
catchup=False,
tags=['sales', 'etl'],
)
def extract_sales(**context):
"""Extract sales data from API and write to staging."""
import json
import requests
# Simulate API call
api_response = {
"orders": [
{"order_id": 1001, "amount": 250.00, "status": "completed"},
{"order_id": 1002, "amount": 45.50, "status": "pending"},
{"order_id": 1003, "amount": 1200.00, "status": "completed"},
]
}
# Push to XCom for downstream tasks
context['task_instance'].xcom_push(key='sales_data', value=api_response)
print(f"Extracted {len(api_response['orders'])} orders")
def validate_sales(**context):
"""Validate extracted data before transformation."""
ti = context['task_instance']
data = ti.xcom_pull(task_ids='extract_sales', key='sales_data')
errors = []
for order in data['orders']:
if order['amount'] <= 0:
errors.append(f"Order {order['order_id']}: invalid amount")
if order['status'] not in ('completed', 'pending', 'cancelled'):
errors.append(f"Order {order['order_id']}: unknown status")
if errors:
raise ValueError(f"Validation errors: {errors}")
print(f"Validation passed: {len(data['orders'])} orders OK")
def transform_sales(**context):
"""Clean and enrich sales data."""
import json
ti = context['task_instance']
data = ti.xcom_pull(task_ids='extract_sales', key='sales_data')
transformed = []
for order in data['orders']:
order['category'] = 'high_value' if order['amount'] > 100 else 'standard'
order['processed_at'] = datetime.now().isoformat()
transformed.append(order)
ti.xcom_push(key='transformed_data', value=transformed)
print(f"Transformed {len(transformed)} orders")
# Task definitions
extract_task = PythonOperator(
task_id='extract_sales',
python_callable=extract_sales,
provide_context=True,
dag=dag,
)
validate_task = PythonOperator(
task_id='validate_sales',
python_callable=validate_sales,
provide_context=True,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_sales',
python_callable=transform_sales,
provide_context=True,
dag=dag,
)
load_task = BashOperator(
task_id='load_to_warehouse',
bash_command='echo "Loading to warehouse... row count: {{ ti.xcom_pull(key=\"transformed_data\") | length }}" && echo "Load complete"',
dag=dag,
)
notify_task = BashOperator(
task_id='send_notification',
bash_command='echo "Sales pipeline completed at $(date)"',
dag=dag,
)
# Dependencies
extract_task >> validate_task >> transform_task >> load_task >> notify_task# airflow_simulator.py
# Simulate Airflow DAG execution locally (no Airflow needed)
from datetime import datetime
class TaskInstance:
def __init__(self):
self.xcom_data = {}
def xcom_push(self, key, value):
self.xcom_data[key] = value
def xcom_pull(self, task_ids, key):
return self.xcom_data.get(key)
class DAGSimulator:
def __init__(self, dag_id):
self.dag_id = dag_id
self.tasks = []
def run(self):
print(f"╔══ Running DAG: {self.dag_id} ══╗\n")
context = {"task_instance": TaskInstance(), "execution_date": datetime.now()}
results = []
for task_name, task_func in self.tasks:
print(f"▶ Task: {task_name}")
try:
task_func(**context)
print(f" ✓ {task_name} succeeded\n")
results.append({"task": task_name, "status": "success"})
except Exception as e:
print(f" ✗ {task_name} FAILED: {e}\n")
results.append({"task": task_name, "status": "failed"})
print("╚══ DAG Complete ═══════════════╝")
return results
# Build and run
sim = DAGSimulator("sales_etl_pipeline")
sim.tasks = [
("extract_sales", extract_sales),
("validate_sales", validate_sales),
("transform_sales", transform_sales),
]
results = sim.run()
print("\nExecution Summary:")
for r in results:
print(f" {r['task']}: {r['status']}")Expected output:
╔══ Running DAG: sales_etl_pipeline ══╗
▶ Task: extract_sales
Extracted 3 orders
✓ extract_sales succeeded
▶ Task: validate_sales
Validation passed: 3 orders OK
✓ validate_sales succeeded
▶ Task: transform_sales
Transformed 3 orders
✓ transform_sales succeeded
╚══ DAG Complete ═════════════════════╝
Execution Summary:
extract_sales: success
validate_sales: success
transform_sales: successAirflow Architecture
flowchart TB
subgraph "Airflow Components"
S[Scheduler] --> M[Metadata DB
PostgreSQL/MySQL]
S --> W[Workers
Execute Tasks]
W --> M
W --> Q[Executor
Celery/Kubernetes]
Q --> W
UI[Web UI
Flask App] --> M
end
S -- "Triggers DAGs" --> DAG[DAG Files
dags/ folder]
DAG --> S
Common Airflow Mistakes
1. Not Setting catchup=False by Default
Without catchup=False, Airflow tries to run every DAG schedule from the start_date to now. For a start_date of 2020, this means thousands of backfill runs. Set catchup=False unless you explicitly want backfill.
2. Long-Running Tasks Blocking the Scheduler
Tasks that poll APIs with time.sleep() waste worker slots. Use Airflow sensors (e.g., HttpSensor, S3KeySensor) that efficiently check conditions without blocking.
3. Hard-Coding Connections in DAGs
Don’t put database passwords or API keys in DAG code. Use Airflow Connections (airflow connections set) and reference them in your operators.
4. Overloading DAGs with Too Many Tasks
A DAG with 200+ tasks is hard to debug. Break large pipelines into multiple DAGs or use SubDAGs and TaskGroups.
5. Ignoring Task Logging
When a task fails, you need logs. Use self.log.info() in Python callables and configure remote log storage (S3, GCS) so logs persist across worker restarts.
6. Not Handling Task Failures Gracefully
Always set retries and retry_delay for production DAGs. Use on_failure_callback to send notifications when retries are exhausted.
Practice Questions
1. What is a DAG in Airflow?
A Directed Acyclic Graph — a collection of tasks with defined dependencies. Tasks execute in order, and no cycles (loops) are allowed.
2. What is the difference between an Operator and a Task?
An Operator defines what type of action to perform (PythonOperator runs Python, BashOperator runs shell). A Task is an instance of an Operator added to a DAG.
3. How does Airflow schedule DAGs?
The Scheduler reads DAG files from the dags/ folder, checks the schedule_interval, and creates DAG Runs at the appropriate times. Workers execute the tasks.
4. What is XCom and when would you use it?
XCom (cross-communication) lets tasks exchange data. Task A pushes a value with xcom_push(key, value). Task B pulls it with xcom_pull(task_ids='A', key=key). Use it for passing small amounts of metadata.
5. Challenge: Design an Airflow DAG for a pipeline that extracts data from a REST API, validates it, runs a dbt transformation, and sends a Slack notification if rows < 100.
Use PythonOperator for extraction and validation, BashOperator for dbt run, PythonOperator with Slack SDK for notification. Add a BranchPythonOperator to conditionally skip the Slack alert when rows >= 100.
Mini Project: Custom Airflow Sensor
# file_sensor.py
# Simulate an Airflow file sensor that waits for a file to appear
import time
import os
from datetime import datetime
class FileSensor:
def __init__(self, filepath, timeout=60, poke_interval=5):
self.filepath = filepath
self.timeout = timeout
self.poke_interval = poke_interval
def poke(self):
"""Check if the file exists."""
return os.path.exists(self.filepath)
def run(self):
print(f"[SENSOR] Waiting for {self.filepath}...")
start = time.time()
while time.time() - start < self.timeout:
if self.poke():
print(f"[SENSOR] File found after {time.time() - start:.1f}s")
return True
print(f"[SENSOR] Not yet... checking again in {self.poke_interval}s")
time.sleep(self.poke_interval)
print(f"[SENSOR] TIMEOUT after {self.timeout}s")
return False
# Simulate file appearing after some time
import threading
def delayed_file_creation(path, delay=8):
time.sleep(delay)
with open(path, 'w') as f:
f.write("ready")
print(f"[SIM] Created file: {path}")
watch_file = "/tmp/dodatech_ready.txt"
threading.Thread(target=delayed_file_creation, args=(watch_file,), daemon=True).start()
sensor = FileSensor(watch_file, timeout=20, poke_interval=3)
result = sensor.run()
print(f"\nSensor result: {'SUCCESS' if result else 'FAILURE'}")
# Cleanup
if os.path.exists(watch_file):
os.remove(watch_file)Expected output:
[SENSOR] Waiting for /tmp/dodatech_ready.txt...
[SENSOR] Not yet... checking again in 3s
[SENSOR] Not yet... checking again in 3s
[SIM] Created file: /tmp/dodatech_ready.txt
[SENSOR] Not yet... checking again in 3s
[SENSOR] File found after 9.0s
Sensor result: SUCCESSRelated Concepts
What’s Next
You now understand Airflow fundamentals! Next, learn dbt for SQL-first transformations, and then explore Apache Spark for processing large-scale data.
- Practice daily — Create a DAG that runs a Python script daily
- Build a project — Deploy Airflow locally with Docker and run your first real DAG
- Explore related topics — Check out Dagster and Prefect as Airflow alternatives
Remember: every expert was once a beginner. Keep coding!
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro