Skip to content
Apache Airflow Guide — DAGs, Operators, and ETL Orchestration

Apache Airflow Guide — DAGs, Operators, and ETL Orchestration

DodaTech Updated Jun 15, 2026 8 min read

Apache Airflow is an open-source workflow orchestration platform that schedules, monitors, and manages complex data pipelines using directed acyclic graphs (DAGs) of tasks.

What You’ll Learn

By the end of this tutorial, you’ll understand Airflow’s core concepts — DAGs, operators, tasks, schedulers — and build a complete ETL DAG using PythonOperator and BashOperator.

Why Airflow Matters

Data pipelines have many steps: extract from APIs, validate data, transform with Spark, load into warehouses, send alerts. Without an orchestrator, you’d rely on cron jobs, manual scripts, and endless debugging. Airflow gives you scheduling, retries, monitoring, and dependency management in one platform. DodaTech uses Airflow to orchestrate Doda Browser analytics pipelines that process millions of events daily.

Apache Airflow Learning Path


flowchart LR
  A[ETL Pipelines] --> B[Apache Airflow]
  B --> C{You Are Here}
  C --> D[DAGs]
  C --> E[Operators]
  C --> F[Scheduler]
  D --> G[Tasks & Dependencies]
  E --> H[PythonOperator]
  E --> I[BashOperator]

Prerequisites: Python fundamentals, understanding of ETL pipelines. Basic command-line knowledge helps.

What Is Airflow?

Think of Airflow like a factory production line manager. Each machine on the line does one job (task). The manager ensures machines start in the right order, handles breakdowns (retries), and tracks progress. If Machine 3 depends on Machine 2’s output, the manager waits until Machine 2 finishes before starting Machine 3.

A DAG is the production line blueprint. Operators are the machines. Tasks are individual jobs run on machines. The Scheduler is the manager.

Core Concepts

Directed Acyclic Graph (DAG)

A DAG defines the pipeline structure — tasks and their dependencies. “Directed” means execution flows one way. “Acyclic” means no loops — you can’t create infinite cycles.

# simple_dag.py
# Minimal Airflow DAG definition
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'dodatech',
    'start_date': datetime(2026, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'simple_etl_dag',
    default_args=default_args,
    description='Simple ETL pipeline',
    schedule_interval='@daily',
    catchup=False,
    tags=['etl', 'tutorial'],
)

Operators

Operators define what to do. Airflow has operators for everything.

OperatorPurpose
PythonOperatorExecute Python functions
BashOperatorExecute shell commands
PostgresOperatorRun SQL queries on PostgreSQL
S3FileTransformOperatorTransform files in S3
EmailOperatorSend emails on success/failure
DummyOperatorNo-op for grouping tasks

Tasks and Dependencies

Tasks are instances of operators. Dependencies define order: task1 >> task2 means task2 runs after task1.

# Defining tasks and dependencies
task_extract = PythonOperator(
    task_id='extract_data',
    python_callable=extract_function,
    dag=dag,
)

task_transform = PythonOperator(
    task_id='transform_data',
    python_callable=transform_function,
    dag=dag,
)

task_load = BashOperator(
    task_id='load_to_warehouse',
    bash_command='dbt run --models sales',
    dag=dag,
)

# Define execution order
task_extract >> task_transform >> task_load

Complete ETL DAG Example

# etl_dag.py
# Complete ETL DAG with PythonOperator and BashOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'dodatech',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
}

dag = DAG(
    'sales_etl_pipeline',
    default_args=default_args,
    description='Daily sales ETL: extract, transform, load',
    schedule_interval='0 2 * * *',  # Run daily at 2 AM
    catchup=False,
    tags=['sales', 'etl'],
)

def extract_sales(**context):
    """Extract sales data from API and write to staging."""
    import json
    import requests

    # Simulate API call
    api_response = {
        "orders": [
            {"order_id": 1001, "amount": 250.00, "status": "completed"},
            {"order_id": 1002, "amount": 45.50, "status": "pending"},
            {"order_id": 1003, "amount": 1200.00, "status": "completed"},
        ]
    }

    # Push to XCom for downstream tasks
    context['task_instance'].xcom_push(key='sales_data', value=api_response)
    print(f"Extracted {len(api_response['orders'])} orders")

def validate_sales(**context):
    """Validate extracted data before transformation."""
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='extract_sales', key='sales_data')
    errors = []

    for order in data['orders']:
        if order['amount'] <= 0:
            errors.append(f"Order {order['order_id']}: invalid amount")
        if order['status'] not in ('completed', 'pending', 'cancelled'):
            errors.append(f"Order {order['order_id']}: unknown status")

    if errors:
        raise ValueError(f"Validation errors: {errors}")
    print(f"Validation passed: {len(data['orders'])} orders OK")

def transform_sales(**context):
    """Clean and enrich sales data."""
    import json
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='extract_sales', key='sales_data')

    transformed = []
    for order in data['orders']:
        order['category'] = 'high_value' if order['amount'] > 100 else 'standard'
        order['processed_at'] = datetime.now().isoformat()
        transformed.append(order)

    ti.xcom_push(key='transformed_data', value=transformed)
    print(f"Transformed {len(transformed)} orders")

# Task definitions
extract_task = PythonOperator(
    task_id='extract_sales',
    python_callable=extract_sales,
    provide_context=True,
    dag=dag,
)

validate_task = PythonOperator(
    task_id='validate_sales',
    python_callable=validate_sales,
    provide_context=True,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_sales',
    python_callable=transform_sales,
    provide_context=True,
    dag=dag,
)

load_task = BashOperator(
    task_id='load_to_warehouse',
    bash_command='echo "Loading to warehouse... row count: {{ ti.xcom_pull(key=\"transformed_data\") | length }}" && echo "Load complete"',
    dag=dag,
)

notify_task = BashOperator(
    task_id='send_notification',
    bash_command='echo "Sales pipeline completed at $(date)"',
    dag=dag,
)

# Dependencies
extract_task >> validate_task >> transform_task >> load_task >> notify_task
# airflow_simulator.py
# Simulate Airflow DAG execution locally (no Airflow needed)
from datetime import datetime

class TaskInstance:
    def __init__(self):
        self.xcom_data = {}

    def xcom_push(self, key, value):
        self.xcom_data[key] = value

    def xcom_pull(self, task_ids, key):
        return self.xcom_data.get(key)

class DAGSimulator:
    def __init__(self, dag_id):
        self.dag_id = dag_id
        self.tasks = []

    def run(self):
        print(f"╔══ Running DAG: {self.dag_id} ══╗\n")
        context = {"task_instance": TaskInstance(), "execution_date": datetime.now()}

        results = []
        for task_name, task_func in self.tasks:
            print(f"▶ Task: {task_name}")
            try:
                task_func(**context)
                print(f"  ✓ {task_name} succeeded\n")
                results.append({"task": task_name, "status": "success"})
            except Exception as e:
                print(f"  ✗ {task_name} FAILED: {e}\n")
                results.append({"task": task_name, "status": "failed"})

        print("╚══ DAG Complete ═══════════════╝")
        return results

# Build and run
sim = DAGSimulator("sales_etl_pipeline")
sim.tasks = [
    ("extract_sales", extract_sales),
    ("validate_sales", validate_sales),
    ("transform_sales", transform_sales),
]
results = sim.run()

print("\nExecution Summary:")
for r in results:
    print(f"  {r['task']}: {r['status']}")

Expected output:

╔══ Running DAG: sales_etl_pipeline ══╗

▶ Task: extract_sales
Extracted 3 orders
  ✓ extract_sales succeeded

▶ Task: validate_sales
Validation passed: 3 orders OK
  ✓ validate_sales succeeded

▶ Task: transform_sales
Transformed 3 orders
  ✓ transform_sales succeeded

╚══ DAG Complete ═════════════════════╝

Execution Summary:
  extract_sales: success
  validate_sales: success
  transform_sales: success

Airflow Architecture


flowchart TB
  subgraph "Airflow Components"
    S[Scheduler] --> M[Metadata DB
PostgreSQL/MySQL] S --> W[Workers
Execute Tasks] W --> M W --> Q[Executor
Celery/Kubernetes] Q --> W UI[Web UI
Flask App] --> M end S -- "Triggers DAGs" --> DAG[DAG Files
dags/ folder] DAG --> S

Common Airflow Mistakes

1. Not Setting catchup=False by Default

Without catchup=False, Airflow tries to run every DAG schedule from the start_date to now. For a start_date of 2020, this means thousands of backfill runs. Set catchup=False unless you explicitly want backfill.

2. Long-Running Tasks Blocking the Scheduler

Tasks that poll APIs with time.sleep() waste worker slots. Use Airflow sensors (e.g., HttpSensor, S3KeySensor) that efficiently check conditions without blocking.

3. Hard-Coding Connections in DAGs

Don’t put database passwords or API keys in DAG code. Use Airflow Connections (airflow connections set) and reference them in your operators.

4. Overloading DAGs with Too Many Tasks

A DAG with 200+ tasks is hard to debug. Break large pipelines into multiple DAGs or use SubDAGs and TaskGroups.

5. Ignoring Task Logging

When a task fails, you need logs. Use self.log.info() in Python callables and configure remote log storage (S3, GCS) so logs persist across worker restarts.

6. Not Handling Task Failures Gracefully

Always set retries and retry_delay for production DAGs. Use on_failure_callback to send notifications when retries are exhausted.

Practice Questions

1. What is a DAG in Airflow?

A Directed Acyclic Graph — a collection of tasks with defined dependencies. Tasks execute in order, and no cycles (loops) are allowed.

2. What is the difference between an Operator and a Task?

An Operator defines what type of action to perform (PythonOperator runs Python, BashOperator runs shell). A Task is an instance of an Operator added to a DAG.

3. How does Airflow schedule DAGs?

The Scheduler reads DAG files from the dags/ folder, checks the schedule_interval, and creates DAG Runs at the appropriate times. Workers execute the tasks.

4. What is XCom and when would you use it?

XCom (cross-communication) lets tasks exchange data. Task A pushes a value with xcom_push(key, value). Task B pulls it with xcom_pull(task_ids='A', key=key). Use it for passing small amounts of metadata.

5. Challenge: Design an Airflow DAG for a pipeline that extracts data from a REST API, validates it, runs a dbt transformation, and sends a Slack notification if rows < 100.

Use PythonOperator for extraction and validation, BashOperator for dbt run, PythonOperator with Slack SDK for notification. Add a BranchPythonOperator to conditionally skip the Slack alert when rows >= 100.

Mini Project: Custom Airflow Sensor

# file_sensor.py
# Simulate an Airflow file sensor that waits for a file to appear
import time
import os
from datetime import datetime

class FileSensor:
    def __init__(self, filepath, timeout=60, poke_interval=5):
        self.filepath = filepath
        self.timeout = timeout
        self.poke_interval = poke_interval

    def poke(self):
        """Check if the file exists."""
        return os.path.exists(self.filepath)

    def run(self):
        print(f"[SENSOR] Waiting for {self.filepath}...")
        start = time.time()
        while time.time() - start < self.timeout:
            if self.poke():
                print(f"[SENSOR] File found after {time.time() - start:.1f}s")
                return True
            print(f"[SENSOR] Not yet... checking again in {self.poke_interval}s")
            time.sleep(self.poke_interval)
        print(f"[SENSOR] TIMEOUT after {self.timeout}s")
        return False

# Simulate file appearing after some time
import threading
def delayed_file_creation(path, delay=8):
    time.sleep(delay)
    with open(path, 'w') as f:
        f.write("ready")
    print(f"[SIM] Created file: {path}")

watch_file = "/tmp/dodatech_ready.txt"
threading.Thread(target=delayed_file_creation, args=(watch_file,), daemon=True).start()

sensor = FileSensor(watch_file, timeout=20, poke_interval=3)
result = sensor.run()
print(f"\nSensor result: {'SUCCESS' if result else 'FAILURE'}")

# Cleanup
if os.path.exists(watch_file):
    os.remove(watch_file)

Expected output:

[SENSOR] Waiting for /tmp/dodatech_ready.txt...
[SENSOR] Not yet... checking again in 3s
[SENSOR] Not yet... checking again in 3s
[SIM] Created file: /tmp/dodatech_ready.txt
[SENSOR] Not yet... checking again in 3s
[SENSOR] File found after 9.0s

Sensor result: SUCCESS

Related Concepts

What’s Next

You now understand Airflow fundamentals! Next, learn dbt for SQL-first transformations, and then explore Apache Spark for processing large-scale data.

  • Practice daily — Create a DAG that runs a Python script daily
  • Build a project — Deploy Airflow locally with Docker and run your first real DAG
  • Explore related topics — Check out Dagster and Prefect as Airflow alternatives

Remember: every expert was once a beginner. Keep coding!

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro