Skip to content
Building Data Pipelines — End-to-End Design, Monitoring, and Orchestration

Building Data Pipelines — End-to-End Design, Monitoring, and Orchestration

DodaTech Updated Jun 15, 2026 10 min read

A data pipeline is an end-to-end system that ingests data from sources, processes it through transformations, and delivers it to consumers — with monitoring, alerting, quality checks, and orchestration ensuring reliability at scale.

What You’ll Learn

By the end of this tutorial, you’ll understand how to design end-to-end data pipelines, implement monitoring and alerting, run data quality checks, handle schema evolution, and orchestrate pipelines with Airflow and Dagster.

Why Data Pipelines Matter

A pipeline that works in development often fails in production — source schemas change, data volumes spike, network partitions happen, and bad data sneaks in. Building pipelines that are reliable, observable, and maintainable is the core skill of a senior data engineer. DodaTech’s analytics platform runs 50+ production pipelines processing over 10TB daily for Doda Browser and Durga Antivirus Pro.

Data Pipelines Learning Path


flowchart LR
  A[ETL Pipelines] --> B[Apache Airflow]
  C[Stream Processing] --> D[Data Pipelines]
  B --> D
  D --> E{You Are Here}
  E --> F[Design]
  E --> G[Monitoring]
  E --> H[Quality]
  E --> I[Orchestration]

Prerequisites: ETL fundamentals, Apache Airflow basics, SQL proficiency. Understanding of data warehousing and stream processing helps.

What Is a Production Data Pipeline?

Think of a data pipeline like a city’s public transit system. Trains (data) arrive from many lines (sources). The system has schedules (orchestration), monitors for delays (alerting), inspects tickets (quality checks), handles track changes (schema evolution), and communicates with passengers (notifications).

A production pipeline is more than just code — it’s a system designed to run reliably, alert when things go wrong, and recover automatically.

End-to-End Pipeline Architecture


flowchart TB
  subgraph "Stage 1: Ingestion"
    S1[Source APIs] --> Q[Kafka / Event Hub]
    DB[(Databases)] --> CDC[Debezium CDC]
    CDC --> Q
  end
  subgraph "Stage 2: Processing"
    Q --> ETL[Spark / Flink]
    ETL --> DL[(Data Lake Raw)]
    DL --> T[dbt Transform]
    T --> DW[(Warehouse)]
  end
  subgraph "Stage 3: Serving"
    DW --> BI[BI Tools]
    DW --> ML[ML Pipelines]
    DW --> API[Serving API]
  end
  subgraph "Observability"
    M[Metrics] -.-> Alert[Alerting]
    M -.-> Dash[Dashboards]
    M -.-> Log[Logs]
  end

Pipeline Design Patterns

1. Medallion Architecture (Bronze → Silver → Gold)

LayerPurposeData Quality
BronzeRaw ingested data, append-onlyNone — accept everything
SilverCleaned, deduplicated, validatedRequired fields, type checks
GoldAggregated, business-readyBusiness rules, referential integrity

2. Starvation Pattern

If a downstream consumer (dashboard, ML model) doesn’t need data for 24 hours, batch is better than streaming. Always match processing latency to business requirements.

3. Dead Letter Queue

Invalid records that can’t be processed should go to a dead letter queue for later inspection, not block the entire pipeline.

Data Quality Checks

# data_quality.py
# Run data quality checks on pipeline output
import json
from datetime import datetime

class DataQualityCheck:
    def __init__(self, name):
        self.name = name
        self.checks = []

    def add_check(self, description, check_func, severity="error"):
        self.checks.append({
            "description": description,
            "func": check_func,
            "severity": severity,
        })

    def run(self, data):
        results = {"check_name": self.name, "timestamp": datetime.now().isoformat(), "pass": True, "results": []}
        for check in self.checks:
            try:
                passed = check["func"](data)
                status = "PASS" if passed else "FAIL"
                if not passed:
                    results["pass"] = False
                    if check["severity"] == "error":
                        results["results"].append(f"  ✗ [{check['severity'].upper()}] {check['description']}")
                        raise ValueError(f"Quality check failed: {check['description']}")
                    else:
                        results["results"].append(f"  ⚠ [{check['severity'].upper()}] {check['description']}")
                else:
                    results["results"].append(f"  ✓ {check['description']}")
            except Exception as e:
                results["pass"] = False
                results["results"].append(f"  ✗ [ERROR] {check['description']}: {e}")
        return results

# Simulate pipeline output
pipeline_output = [
    {"order_id": 1001, "amount": 250.00, "status": "completed", "date": "2026-06-01"},
    {"order_id": 1002, "amount": 45.50, "status": "pending", "date": "2026-06-02"},
    {"order_id": 1003, "amount": -10.00, "status": "completed", "date": "2026-06-03"},
    {"order_id": None, "amount": 100.00, "status": "completed", "date": "2026-06-04"},
    {"order_id": 1005, "amount": 200.00, "status": "unknown_status", "date": "2026-06-05"},
]

qc = DataQualityCheck("sales_pipeline_qc")
qc.add_check("No null order IDs", lambda d: all(r.get("order_id") for r in d))
qc.add_check("No negative amounts", lambda d: all(r.get("amount", 0) >= 0 for r in d))
qc.add_check("Valid status values", lambda d: all(r.get("status") in ("completed", "pending", "cancelled") for r in d))
qc.add_check("Row count >= 3", lambda d: len(d) >= 3)
qc.add_check("No empty dates", lambda d: all(r.get("date") for r in d))

print("=== Pipeline Data Quality Report ===\n")
results = qc.run(pipeline_output)
for line in results["results"]:
    print(line)
print(f"\nOverall: {'✓ PASS' if results['pass'] else '✗ FAIL'}")

Expected output:

=== Pipeline Data Quality Report ===

  ✓ No null order IDs
  ✗ [ERROR] No negative amounts: Quality check failed: No negative amounts

Overall: ✗ FAIL

Schema Evolution

Sources change schemas over time. A production pipeline must handle:

ChangeStrategy
Adding a columnBackward compatible — pipeline should pass through new columns
Removing a columnFail gracefully — log and use default/null
Renaming a columnCreate a mapping layer or use schema registry
Changing data typeCast to common type, log warnings
Adding a NOT NULL constraintClean data upstream or provide defaults
# schema_evolution.py
# Handle schema evolution in pipelines
from typing import Any

class SchemaHandler:
    def __init__(self, version: int = 1):
        self.version = version
        self.columns = {
            1: ["order_id", "amount", "status"],
            2: ["order_id", "amount", "status", "region"],
            3: ["order_id", "total", "status", "region", "currency"],
        }
        self.type_map = {
            1: {"order_id": int, "amount": float, "status": str},
            2: {"order_id": int, "amount": float, "status": str, "region": str},
            3: {"order_id": int, "total": float, "status": str, "region": str, "currency": str},
        }

    def resolve(self, record: dict) -> dict:
        """Resolve a record against known schema versions."""
        current_columns = self.columns[self.version]
        current_types = self.type_map[self.version]
        resolved = {}

        for col in current_columns:
            value = record.get(col)
            if value is not None:
                if col == "total" and "amount" in record:
                    value = record["amount"]
                elif col == "total" and value is None:
                    value = record.get("amount", 0.0)
                try:
                    resolved[col] = current_types.get(col, str)(value)
                except (ValueError, TypeError):
                    resolved[col] = None
                    print(f"  [WARN] Column {col}: type conversion failed for {value}")
            elif col in ("region", "currency"):
                resolved[col] = "unknown"
            else:
                resolved[col] = None
        return resolved

handler = SchemaHandler(version=3)

records = [
    {"order_id": "1001", "amount": "250.00", "status": "completed"},
    {"order_id": "1002", "amount": "45.50", "status": "pending", "region": "US"},
    {"order_id": "1003", "total": "120.00", "status": "shipped", "currency": "USD"},
]

print("=== Schema Evolution Resolution (v3 target) ===")
for r in records:
    resolved = handler.resolve(r)
    print(f"  Input: {r}")
    print(f"  Output: {resolved}\n")

Expected output:

=== Schema Evolution Resolution (v3 target) ===
  Input: {'order_id': '1001', 'amount': '250.00', 'status': 'completed'}
  Output: {'order_id': 1001, 'total': 250.0, 'status': 'completed', 'region': 'unknown', 'currency': 'unknown'}

  Input: {'order_id': '1002', 'amount': '45.50', 'status': 'pending', 'region': 'US'}
  Output: {'order_id': 1002, 'total': 45.5, 'status': 'pending', 'region': 'US', 'currency': 'unknown'}

  Input: {'order_id': '1003', 'total': '120.00', 'status': 'shipped', 'currency': 'USD'}
  Output: {'order_id': 1003, 'total': 120.0, 'status': 'shipped', 'region': 'unknown', 'currency': 'USD'}

Monitoring and Alerting

# pipeline_monitor.py
# Full pipeline monitoring with metrics
import time
import random
from datetime import datetime
from collections import deque

class PipelineMonitor:
    def __init__(self, pipeline_name):
        self.name = pipeline_name
        self.runs = deque(maxlen=100)
        self.current_run = None

    def start_run(self):
        self.current_run = {"start": datetime.now(), "status": "running"}
        print(f"[{self.name}] Run started at {self.current_run['start']}")

    def end_run(self, status, rows_processed=0, errors=0):
        if self.current_run:
            self.current_run["end"] = datetime.now()
            self.current_run["duration_seconds"] = (self.current_run["end"] - self.current_run["start"]).total_seconds()
            self.current_run["status"] = status
            self.current_run["rows_processed"] = rows_processed
            self.current_run["errors"] = errors
            self.runs.append(self.current_run)

            duration = self.current_run["duration_seconds"]
            print(f"[{self.name}] Run completed: {status}, {rows_processed} rows, {errors} errors, {duration:.1f}s")

            # Alert on long runs
            if duration > 300:
                print(f"[ALERT] Pipeline {self.name}: run exceeded 5 minutes ({duration:.1f}s)")

            # Alert on high error rate
            if rows_processed > 0 and errors / rows_processed > 0.05:
                print(f"[ALERT] Pipeline {self.name}: error rate {(errors/rows_processed)*100:.1f}% > 5%")

    def health(self):
        if len(self.runs) < 5:
            return "insufficient data"
        recent = list(self.runs)[-5:]
        failure_rate = sum(1 for r in recent if r["status"] == "failed") / len(recent)
        if failure_rate > 0.2:
            return "unhealthy"
        return "healthy"

monitor = PipelineMonitor("daily_sales_etl")

print("=== Pipeline Monitoring Dashboard ===\n")

for i in range(1, 11):
    monitor.start_run()
    rows = random.randint(1000, 10000)
    errors = 0 if random.random() > 0.2 else random.randint(1, 500)
    status = "success" if errors < rows * 0.1 else "failed"
    time.sleep(0.1)
    monitor.end_run(status, rows, errors)

print(f"\n=== Pipeline Health: {monitor.health().upper()} ===")

Expected output:

=== Pipeline Monitoring Dashboard ===

[daily_sales_etl] Run started at ...
[daily_sales_etl] Run completed: success, 5432 rows, 0 errors, 0.1s
...

=== Pipeline Health: HEALTHY ===

Orchestration with Airflow vs Dagster

FeatureAirflowDagster
DAG definitionPython DAG objectsPython @op / @job decorators
TestingSeparate test filesBuilt-in with dagster_dev
Data awarenessMinimal (XCom)Native (type system, asset lineage)
UITree / Graph viewDagit (rich asset graph)
Best forProduction ETL at scaleData platform teams, ML pipelines

Common Data Pipeline Mistakes

1. No Dead Letter Queue

When a bad record breaks the pipeline, the entire batch fails. Implement a dead letter queue so you can inspect bad records later without blocking good data.

2. Not Monitoring Data Freshness

The pipeline runs, but data is stale (source was down). Monitor row counts, freshness timestamps, and compare against expected values.

3. Manual Recovery

If a pipeline fails and requires manual restart every time, your pager will never stop ringing. Build automatic retries with exponential backoff.

4. No Schema Evolution Plan

Source teams add columns without telling you. Your pipeline breaks. Use schema-on-read or a schema registry to handle evolution gracefully.

5. Over-Complicated Dependencies

A pipeline DAG with branches, joins, and 100+ tasks is hard to reason about. Simplify: use sub-DAGs, group related tasks, and document dependencies.

6. Ignoring Pipeline Cost

Processing 10TB daily costs real money. Monitor Spark shuffle writes, BigQuery bytes scanned, and S3 API costs. Optimize expensive patterns.

Practice Questions

1. What is the medallion architecture in data pipelines?

Bronze (raw data), Silver (cleaned/deduplicated), Gold (aggregated/curated). Each layer increases data quality and reduces volume. It’s the standard pattern for lakehouse pipelines.

2. Why is a dead letter queue important?

When invalid records arrive (corrupted JSON, missing required fields), they can’t be processed. A dead letter queue stores these records for later inspection without blocking the rest of the pipeline.

3. What are the key metrics to monitor in a data pipeline?

Run duration, row counts (expected vs actual), error rate, data freshness (time since last successful run), source availability, and bytes processed.

4. How does schema evolution affect production pipelines?

Source schemas change over time (columns added, renamed, removed). Pipelines must handle these changes gracefully without breaking. Strategies include schema registries, column mapping, and backward-compatible schema design.

5. Challenge: Design a monitoring system for 20 pipelines processing 5TB daily. Requirements: alert within 2 minutes of failure, track row count trends, and page the on-call engineer if 3+ pipelines fail within 1 hour.

Use Prometheus to expose pipeline metrics (rows, duration, status). Grafana dashboards for trends. Alertmanager for alerts: immediate page for single pipeline failure, escalation (call) if 3+ fail within an hour. Run health checks every 60 seconds.

Mini Project: End-to-End Pipeline Simulation

# e2e_pipeline.py
# Simulate an end-to-end data pipeline with all stages
import time
import random
from datetime import datetime

class Stage:
    def __init__(self, name, simulate_func):
        self.name = name
        self.simulate = simulate_func
        self.metrics = {"runs": 0, "failures": 0, "total_duration": 0}

    def execute(self, input_data):
        self.metrics["runs"] += 1
        start = time.time()
        try:
            result = self.simulate(input_data)
            duration = time.time() - start
            self.metrics["total_duration"] += duration
            return result
        except Exception as e:
            self.metrics["failures"] += 1
            raise

class Pipeline:
    def __init__(self, name):
        self.name = name
        self.stages = []
        self.monitor = {"runs": 0, "failures": 0}

    def add_stage(self, stage):
        self.stages.append(stage)

    def run(self, input_data):
        self.monitor["runs"] += 1
        print(f"\n{'='*50}")
        print(f"Pipeline: {self.name}")
        print(f"Run #{self.monitor['runs']} at {datetime.now().strftime('%H:%M:%S')}")
        print(f"{'='*50}")

        current = input_data
        for stage in self.stages:
            print(f"\n[{stage.name}]")
            try:
                current = stage.execute(current)
                print(f"  ✓ Complete")
            except Exception as e:
                print(f"  ✗ Failed: {e}")
                self.monitor["failures"] += 1
                break
        return current

# Define stages
def extract(source):
    print(f"  Extracting from {source}...")
    time.sleep(0.2)
    return [{"id": i, "value": random.randint(1, 100)} for i in range(10)]

def validate(data):
    print(f"  Validating {len(data)} records...")
    for r in data:
        if r["value"] < 0:
            raise ValueError(f"Negative value: {r}")
    return [r for r in data if r["value"] > 10]

def transform(data):
    print(f"  Transforming {len(data)} records...")
    result = []
    for r in data:
        r["value_squared"] = r["value"] ** 2
        r["processed_at"] = datetime.now().isoformat()
        result.append(r)
    time.sleep(0.1)
    return result

def load(data):
    print(f"  Loading {len(data)} records to warehouse...")
    time.sleep(0.1)
    return {"loaded": len(data), "destination": "analytics_db.sales"}

# Build and run pipeline
pipeline = Pipeline("Customer Analytics Pipeline")
pipeline.add_stage(Stage("Extract", extract))
pipeline.add_stage(Stage("Validate", validate))
pipeline.add_stage(Stage("Transform", transform))
pipeline.add_stage(Stage("Load", load))

for i in range(3):
    result = pipeline.run("api.dodatech.com/events")
    print(f"\n  Result: {result}")

print(f"\n{'='*50}")
print("Pipeline Summary:")
for stage in pipeline.stages:
    avg_dur = stage.metrics["total_duration"] / max(stage.metrics["runs"], 1)
    print(f"  {stage.name}: {stage.metrics['runs']} runs, {stage.metrics['failures']} failures, avg {avg_dur:.2f}s")
print(f"Total: {pipeline.monitor['runs']} runs, {pipeline.monitor['failures']} failures")

Expected output:

==================================================
Pipeline: Customer Analytics Pipeline
Run #1 at 10:00:00
==================================================

[Extract]
  Extracting from api.dodatech.com/events...
  ✓ Complete

[Validate]
  Validating 10 records...
  ✓ Complete
...

Pipeline Summary:
  Extract: 3 runs, 0 failures, avg 0.20s
  Validate: 3 runs, 0 failures, avg 0.00s
  Transform: 3 runs, 0 failures, avg 0.10s
  Load: 3 runs, 0 failures, avg 0.10s
Total: 3 runs, 0 failures

Related Concepts

What’s Next

You now understand how to build production data pipelines! Next, learn data modeling to design star schemas and slowly changing dimensions, and explore dbt for SQL transformations in production.

  • Practice daily — Add quality checks and monitoring to your existing pipelines
  • Build a project — Create a complete pipeline with Bronze → Silver → Gold layers
  • Explore related topics — Check out Dagster for asset-based orchestration

Remember: every expert was once a beginner. Keep coding!

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro