Building Data Pipelines — End-to-End Design, Monitoring, and Orchestration
A data pipeline is an end-to-end system that ingests data from sources, processes it through transformations, and delivers it to consumers — with monitoring, alerting, quality checks, and orchestration ensuring reliability at scale.
What You’ll Learn
By the end of this tutorial, you’ll understand how to design end-to-end data pipelines, implement monitoring and alerting, run data quality checks, handle schema evolution, and orchestrate pipelines with Airflow and Dagster.
Why Data Pipelines Matter
A pipeline that works in development often fails in production — source schemas change, data volumes spike, network partitions happen, and bad data sneaks in. Building pipelines that are reliable, observable, and maintainable is the core skill of a senior data engineer. DodaTech’s analytics platform runs 50+ production pipelines processing over 10TB daily for Doda Browser and Durga Antivirus Pro.
Data Pipelines Learning Path
flowchart LR
A[ETL Pipelines] --> B[Apache Airflow]
C[Stream Processing] --> D[Data Pipelines]
B --> D
D --> E{You Are Here}
E --> F[Design]
E --> G[Monitoring]
E --> H[Quality]
E --> I[Orchestration]
What Is a Production Data Pipeline?
Think of a data pipeline like a city’s public transit system. Trains (data) arrive from many lines (sources). The system has schedules (orchestration), monitors for delays (alerting), inspects tickets (quality checks), handles track changes (schema evolution), and communicates with passengers (notifications).
A production pipeline is more than just code — it’s a system designed to run reliably, alert when things go wrong, and recover automatically.
End-to-End Pipeline Architecture
flowchart TB
subgraph "Stage 1: Ingestion"
S1[Source APIs] --> Q[Kafka / Event Hub]
DB[(Databases)] --> CDC[Debezium CDC]
CDC --> Q
end
subgraph "Stage 2: Processing"
Q --> ETL[Spark / Flink]
ETL --> DL[(Data Lake Raw)]
DL --> T[dbt Transform]
T --> DW[(Warehouse)]
end
subgraph "Stage 3: Serving"
DW --> BI[BI Tools]
DW --> ML[ML Pipelines]
DW --> API[Serving API]
end
subgraph "Observability"
M[Metrics] -.-> Alert[Alerting]
M -.-> Dash[Dashboards]
M -.-> Log[Logs]
end
Pipeline Design Patterns
1. Medallion Architecture (Bronze → Silver → Gold)
| Layer | Purpose | Data Quality |
|---|---|---|
| Bronze | Raw ingested data, append-only | None — accept everything |
| Silver | Cleaned, deduplicated, validated | Required fields, type checks |
| Gold | Aggregated, business-ready | Business rules, referential integrity |
2. Starvation Pattern
If a downstream consumer (dashboard, ML model) doesn’t need data for 24 hours, batch is better than streaming. Always match processing latency to business requirements.
3. Dead Letter Queue
Invalid records that can’t be processed should go to a dead letter queue for later inspection, not block the entire pipeline.
Data Quality Checks
# data_quality.py
# Run data quality checks on pipeline output
import json
from datetime import datetime
class DataQualityCheck:
def __init__(self, name):
self.name = name
self.checks = []
def add_check(self, description, check_func, severity="error"):
self.checks.append({
"description": description,
"func": check_func,
"severity": severity,
})
def run(self, data):
results = {"check_name": self.name, "timestamp": datetime.now().isoformat(), "pass": True, "results": []}
for check in self.checks:
try:
passed = check["func"](data)
status = "PASS" if passed else "FAIL"
if not passed:
results["pass"] = False
if check["severity"] == "error":
results["results"].append(f" ✗ [{check['severity'].upper()}] {check['description']}")
raise ValueError(f"Quality check failed: {check['description']}")
else:
results["results"].append(f" ⚠ [{check['severity'].upper()}] {check['description']}")
else:
results["results"].append(f" ✓ {check['description']}")
except Exception as e:
results["pass"] = False
results["results"].append(f" ✗ [ERROR] {check['description']}: {e}")
return results
# Simulate pipeline output
pipeline_output = [
{"order_id": 1001, "amount": 250.00, "status": "completed", "date": "2026-06-01"},
{"order_id": 1002, "amount": 45.50, "status": "pending", "date": "2026-06-02"},
{"order_id": 1003, "amount": -10.00, "status": "completed", "date": "2026-06-03"},
{"order_id": None, "amount": 100.00, "status": "completed", "date": "2026-06-04"},
{"order_id": 1005, "amount": 200.00, "status": "unknown_status", "date": "2026-06-05"},
]
qc = DataQualityCheck("sales_pipeline_qc")
qc.add_check("No null order IDs", lambda d: all(r.get("order_id") for r in d))
qc.add_check("No negative amounts", lambda d: all(r.get("amount", 0) >= 0 for r in d))
qc.add_check("Valid status values", lambda d: all(r.get("status") in ("completed", "pending", "cancelled") for r in d))
qc.add_check("Row count >= 3", lambda d: len(d) >= 3)
qc.add_check("No empty dates", lambda d: all(r.get("date") for r in d))
print("=== Pipeline Data Quality Report ===\n")
results = qc.run(pipeline_output)
for line in results["results"]:
print(line)
print(f"\nOverall: {'✓ PASS' if results['pass'] else '✗ FAIL'}")Expected output:
=== Pipeline Data Quality Report ===
✓ No null order IDs
✗ [ERROR] No negative amounts: Quality check failed: No negative amounts
Overall: ✗ FAILSchema Evolution
Sources change schemas over time. A production pipeline must handle:
| Change | Strategy |
|---|---|
| Adding a column | Backward compatible — pipeline should pass through new columns |
| Removing a column | Fail gracefully — log and use default/null |
| Renaming a column | Create a mapping layer or use schema registry |
| Changing data type | Cast to common type, log warnings |
| Adding a NOT NULL constraint | Clean data upstream or provide defaults |
# schema_evolution.py
# Handle schema evolution in pipelines
from typing import Any
class SchemaHandler:
def __init__(self, version: int = 1):
self.version = version
self.columns = {
1: ["order_id", "amount", "status"],
2: ["order_id", "amount", "status", "region"],
3: ["order_id", "total", "status", "region", "currency"],
}
self.type_map = {
1: {"order_id": int, "amount": float, "status": str},
2: {"order_id": int, "amount": float, "status": str, "region": str},
3: {"order_id": int, "total": float, "status": str, "region": str, "currency": str},
}
def resolve(self, record: dict) -> dict:
"""Resolve a record against known schema versions."""
current_columns = self.columns[self.version]
current_types = self.type_map[self.version]
resolved = {}
for col in current_columns:
value = record.get(col)
if value is not None:
if col == "total" and "amount" in record:
value = record["amount"]
elif col == "total" and value is None:
value = record.get("amount", 0.0)
try:
resolved[col] = current_types.get(col, str)(value)
except (ValueError, TypeError):
resolved[col] = None
print(f" [WARN] Column {col}: type conversion failed for {value}")
elif col in ("region", "currency"):
resolved[col] = "unknown"
else:
resolved[col] = None
return resolved
handler = SchemaHandler(version=3)
records = [
{"order_id": "1001", "amount": "250.00", "status": "completed"},
{"order_id": "1002", "amount": "45.50", "status": "pending", "region": "US"},
{"order_id": "1003", "total": "120.00", "status": "shipped", "currency": "USD"},
]
print("=== Schema Evolution Resolution (v3 target) ===")
for r in records:
resolved = handler.resolve(r)
print(f" Input: {r}")
print(f" Output: {resolved}\n")Expected output:
=== Schema Evolution Resolution (v3 target) ===
Input: {'order_id': '1001', 'amount': '250.00', 'status': 'completed'}
Output: {'order_id': 1001, 'total': 250.0, 'status': 'completed', 'region': 'unknown', 'currency': 'unknown'}
Input: {'order_id': '1002', 'amount': '45.50', 'status': 'pending', 'region': 'US'}
Output: {'order_id': 1002, 'total': 45.5, 'status': 'pending', 'region': 'US', 'currency': 'unknown'}
Input: {'order_id': '1003', 'total': '120.00', 'status': 'shipped', 'currency': 'USD'}
Output: {'order_id': 1003, 'total': 120.0, 'status': 'shipped', 'region': 'unknown', 'currency': 'USD'}Monitoring and Alerting
# pipeline_monitor.py
# Full pipeline monitoring with metrics
import time
import random
from datetime import datetime
from collections import deque
class PipelineMonitor:
def __init__(self, pipeline_name):
self.name = pipeline_name
self.runs = deque(maxlen=100)
self.current_run = None
def start_run(self):
self.current_run = {"start": datetime.now(), "status": "running"}
print(f"[{self.name}] Run started at {self.current_run['start']}")
def end_run(self, status, rows_processed=0, errors=0):
if self.current_run:
self.current_run["end"] = datetime.now()
self.current_run["duration_seconds"] = (self.current_run["end"] - self.current_run["start"]).total_seconds()
self.current_run["status"] = status
self.current_run["rows_processed"] = rows_processed
self.current_run["errors"] = errors
self.runs.append(self.current_run)
duration = self.current_run["duration_seconds"]
print(f"[{self.name}] Run completed: {status}, {rows_processed} rows, {errors} errors, {duration:.1f}s")
# Alert on long runs
if duration > 300:
print(f"[ALERT] Pipeline {self.name}: run exceeded 5 minutes ({duration:.1f}s)")
# Alert on high error rate
if rows_processed > 0 and errors / rows_processed > 0.05:
print(f"[ALERT] Pipeline {self.name}: error rate {(errors/rows_processed)*100:.1f}% > 5%")
def health(self):
if len(self.runs) < 5:
return "insufficient data"
recent = list(self.runs)[-5:]
failure_rate = sum(1 for r in recent if r["status"] == "failed") / len(recent)
if failure_rate > 0.2:
return "unhealthy"
return "healthy"
monitor = PipelineMonitor("daily_sales_etl")
print("=== Pipeline Monitoring Dashboard ===\n")
for i in range(1, 11):
monitor.start_run()
rows = random.randint(1000, 10000)
errors = 0 if random.random() > 0.2 else random.randint(1, 500)
status = "success" if errors < rows * 0.1 else "failed"
time.sleep(0.1)
monitor.end_run(status, rows, errors)
print(f"\n=== Pipeline Health: {monitor.health().upper()} ===")Expected output:
=== Pipeline Monitoring Dashboard ===
[daily_sales_etl] Run started at ...
[daily_sales_etl] Run completed: success, 5432 rows, 0 errors, 0.1s
...
=== Pipeline Health: HEALTHY ===Orchestration with Airflow vs Dagster
| Feature | Airflow | Dagster |
|---|---|---|
| DAG definition | Python DAG objects | Python @op / @job decorators |
| Testing | Separate test files | Built-in with dagster_dev |
| Data awareness | Minimal (XCom) | Native (type system, asset lineage) |
| UI | Tree / Graph view | Dagit (rich asset graph) |
| Best for | Production ETL at scale | Data platform teams, ML pipelines |
Common Data Pipeline Mistakes
1. No Dead Letter Queue
When a bad record breaks the pipeline, the entire batch fails. Implement a dead letter queue so you can inspect bad records later without blocking good data.
2. Not Monitoring Data Freshness
The pipeline runs, but data is stale (source was down). Monitor row counts, freshness timestamps, and compare against expected values.
3. Manual Recovery
If a pipeline fails and requires manual restart every time, your pager will never stop ringing. Build automatic retries with exponential backoff.
4. No Schema Evolution Plan
Source teams add columns without telling you. Your pipeline breaks. Use schema-on-read or a schema registry to handle evolution gracefully.
5. Over-Complicated Dependencies
A pipeline DAG with branches, joins, and 100+ tasks is hard to reason about. Simplify: use sub-DAGs, group related tasks, and document dependencies.
6. Ignoring Pipeline Cost
Processing 10TB daily costs real money. Monitor Spark shuffle writes, BigQuery bytes scanned, and S3 API costs. Optimize expensive patterns.
Practice Questions
1. What is the medallion architecture in data pipelines?
Bronze (raw data), Silver (cleaned/deduplicated), Gold (aggregated/curated). Each layer increases data quality and reduces volume. It’s the standard pattern for lakehouse pipelines.
2. Why is a dead letter queue important?
When invalid records arrive (corrupted JSON, missing required fields), they can’t be processed. A dead letter queue stores these records for later inspection without blocking the rest of the pipeline.
3. What are the key metrics to monitor in a data pipeline?
Run duration, row counts (expected vs actual), error rate, data freshness (time since last successful run), source availability, and bytes processed.
4. How does schema evolution affect production pipelines?
Source schemas change over time (columns added, renamed, removed). Pipelines must handle these changes gracefully without breaking. Strategies include schema registries, column mapping, and backward-compatible schema design.
5. Challenge: Design a monitoring system for 20 pipelines processing 5TB daily. Requirements: alert within 2 minutes of failure, track row count trends, and page the on-call engineer if 3+ pipelines fail within 1 hour.
Use Prometheus to expose pipeline metrics (rows, duration, status). Grafana dashboards for trends. Alertmanager for alerts: immediate page for single pipeline failure, escalation (call) if 3+ fail within an hour. Run health checks every 60 seconds.
Mini Project: End-to-End Pipeline Simulation
# e2e_pipeline.py
# Simulate an end-to-end data pipeline with all stages
import time
import random
from datetime import datetime
class Stage:
def __init__(self, name, simulate_func):
self.name = name
self.simulate = simulate_func
self.metrics = {"runs": 0, "failures": 0, "total_duration": 0}
def execute(self, input_data):
self.metrics["runs"] += 1
start = time.time()
try:
result = self.simulate(input_data)
duration = time.time() - start
self.metrics["total_duration"] += duration
return result
except Exception as e:
self.metrics["failures"] += 1
raise
class Pipeline:
def __init__(self, name):
self.name = name
self.stages = []
self.monitor = {"runs": 0, "failures": 0}
def add_stage(self, stage):
self.stages.append(stage)
def run(self, input_data):
self.monitor["runs"] += 1
print(f"\n{'='*50}")
print(f"Pipeline: {self.name}")
print(f"Run #{self.monitor['runs']} at {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*50}")
current = input_data
for stage in self.stages:
print(f"\n[{stage.name}]")
try:
current = stage.execute(current)
print(f" ✓ Complete")
except Exception as e:
print(f" ✗ Failed: {e}")
self.monitor["failures"] += 1
break
return current
# Define stages
def extract(source):
print(f" Extracting from {source}...")
time.sleep(0.2)
return [{"id": i, "value": random.randint(1, 100)} for i in range(10)]
def validate(data):
print(f" Validating {len(data)} records...")
for r in data:
if r["value"] < 0:
raise ValueError(f"Negative value: {r}")
return [r for r in data if r["value"] > 10]
def transform(data):
print(f" Transforming {len(data)} records...")
result = []
for r in data:
r["value_squared"] = r["value"] ** 2
r["processed_at"] = datetime.now().isoformat()
result.append(r)
time.sleep(0.1)
return result
def load(data):
print(f" Loading {len(data)} records to warehouse...")
time.sleep(0.1)
return {"loaded": len(data), "destination": "analytics_db.sales"}
# Build and run pipeline
pipeline = Pipeline("Customer Analytics Pipeline")
pipeline.add_stage(Stage("Extract", extract))
pipeline.add_stage(Stage("Validate", validate))
pipeline.add_stage(Stage("Transform", transform))
pipeline.add_stage(Stage("Load", load))
for i in range(3):
result = pipeline.run("api.dodatech.com/events")
print(f"\n Result: {result}")
print(f"\n{'='*50}")
print("Pipeline Summary:")
for stage in pipeline.stages:
avg_dur = stage.metrics["total_duration"] / max(stage.metrics["runs"], 1)
print(f" {stage.name}: {stage.metrics['runs']} runs, {stage.metrics['failures']} failures, avg {avg_dur:.2f}s")
print(f"Total: {pipeline.monitor['runs']} runs, {pipeline.monitor['failures']} failures")Expected output:
==================================================
Pipeline: Customer Analytics Pipeline
Run #1 at 10:00:00
==================================================
[Extract]
Extracting from api.dodatech.com/events...
✓ Complete
[Validate]
Validating 10 records...
✓ Complete
...
Pipeline Summary:
Extract: 3 runs, 0 failures, avg 0.20s
Validate: 3 runs, 0 failures, avg 0.00s
Transform: 3 runs, 0 failures, avg 0.10s
Load: 3 runs, 0 failures, avg 0.10s
Total: 3 runs, 0 failuresRelated Concepts
What’s Next
You now understand how to build production data pipelines! Next, learn data modeling to design star schemas and slowly changing dimensions, and explore dbt for SQL transformations in production.
- Practice daily — Add quality checks and monitoring to your existing pipelines
- Build a project — Create a complete pipeline with Bronze → Silver → Gold layers
- Explore related topics — Check out Dagster for asset-based orchestration
Remember: every expert was once a beginner. Keep coding!
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro