ETL Pipelines Explained — Extract, Transform, Load with Python Examples
An ETL pipeline is a data integration process that extracts data from source systems, transforms it into a usable format, and loads it into a destination such as a data warehouse or data lake.
What You’ll Learn
By the end of this tutorial, you’ll understand the ETL process, how it differs from ELT, when to use batch vs streaming, and how to build a complete ETL pipeline in Python.
Why ETL Matters
Raw data is rarely usable. Databases store data for transactions, not analytics. APIs return nested JSON. Log files have inconsistent formats. ETL pipelines clean, standardize, and structure this data so analysts and data scientists can work with it. DodaTech’s Durga Antivirus Pro uses ETL pipelines to process threat intelligence feeds from dozens of sources into a unified detection format.
ETL Pipeline Architecture
flowchart LR
subgraph "Extract"
A1[(PostgreSQL)]
A2[CSV Files]
A3[REST API]
end
subgraph "Transform"
B1[Clean]
B2[Validate]
B3[Aggregate]
B4[Join]
end
subgraph "Load"
C1[(Snowflake)]
C2[(BigQuery)]
C3[(S3 Parquet)]
end
A1 --> B1
A2 --> B1
A3 --> B1
B1 --> B2 --> B3 --> B4
B4 --> C1
B4 --> C2
B4 --> C3
ETL vs ELT
| ETL | ELT | |
|---|---|---|
| Order | Extract → Transform → Load | Extract → Load → Transform |
| Where | Transform happens in a staging area or middleware | Transform happens inside the warehouse |
| Best for | Legacy systems, complex transformations | Cloud warehouses (Snowflake, BigQuery) |
| Compute | Separate transformation server | Warehouse compute power |
| Flexibility | Less flexible (schema defined before load) | More flexible (raw data available for reprocessing) |
What Is ETL? (The “Why” First)
Think of ETL like preparing ingredients for a restaurant kitchen. Extraction is buying vegetables from the market (multiple sources). Transformation is washing, chopping, and marinating (processing). Loading is placing the prepared ingredients in the walk-in cooler (storage), ready for chefs to use.
Without ETL, every chef (data consumer) would have to go to the market themselves — an impossible waste of time.
Building an ETL Pipeline in Python
Let’s build a complete ETL pipeline that reads sales data from CSV, transforms it, and loads it into a database.
# etl_pipeline.py
# Complete ETL: CSV → Transform → Database (simulated)
import csv
import sqlite3
from datetime import datetime
# ── Extract ──
def extract_csv(filepath):
print(f"[EXTRACT] Reading {filepath}...")
with open(filepath, 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)
print(f"[EXTRACT] Found {len(rows)} rows")
return rows
# ── Transform ──
def transform_sales(rows):
print("[TRANSFORM] Cleaning and enriching...")
transformed = []
errors = 0
for row in rows:
try:
# Parse and standardize date
row['order_date'] = datetime.strptime(row['order_date'], '%Y-%m-%d').date().isoformat()
# Clean price (remove $, convert to float)
row['amount'] = float(row['amount'].replace('$', '').replace(',', ''))
# Normalize status
row['status'] = row['status'].strip().lower()
# Add derived columns
row['processed_at'] = datetime.now().isoformat()
if row['amount'] > 100:
row['category'] = 'high_value'
else:
row['category'] = 'standard'
transformed.append(row)
except (ValueError, KeyError) as e:
errors += 1
print(f" [WARN] Skipping row: {e}")
print(f"[TRANSFORM] {len(transformed)} valid, {errors} errors")
return transformed
# ── Load ──
def load_to_db(rows, db_path):
print(f"[LOAD] Writing to {db_path}...")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sales (
order_id TEXT PRIMARY KEY,
customer_name TEXT,
order_date TEXT,
amount REAL,
status TEXT,
category TEXT,
processed_at TEXT
)
''')
inserted = 0
for row in rows:
try:
cursor.execute('''
INSERT OR REPLACE INTO sales
(order_id, customer_name, order_date, amount, status, category, processed_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
row['order_id'], row['customer_name'], row['order_date'],
row['amount'], row['status'], row['category'], row['processed_at']
))
inserted += 1
except Exception as e:
print(f" [ERROR] Failed to insert {row.get('order_id', 'unknown')}: {e}")
conn.commit()
conn.close()
print(f"[LOAD] Inserted {inserted} rows")
# ── Orchestrate ──
def run_etl():
print("=" * 50)
print("ETL Pipeline: Sales Data")
print(f"Started: {datetime.now().isoformat()}")
print("=" * 50)
raw_data = extract_csv('sales_data.csv')
clean_data = transform_sales(raw_data)
load_to_db(clean_data, 'sales_warehouse.db')
print("=" * 50)
print("ETL Pipeline Complete!")
print("=" * 50)
# ── Test with sample data ──
def create_sample_csv():
with open('sales_data.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['order_id', 'customer_name', 'order_date', 'amount', 'status'])
writer.writerow(['1001', 'Alice', '2026-06-01', '$250.00', 'completed'])
writer.writerow(['1002', 'Bob', '2026-06-02', '$45.50', 'pending'])
writer.writerow(['1003', 'Charlie', '2026-06-03', '$1,200.00', 'completed'])
writer.writerow(['1004', 'Diana', '2026-06-04', '$89.99', 'cancelled'])
writer.writerow(['1005', 'Invalid', 'not-a-date', '$50.00', 'completed'])
if __name__ == '__main__':
create_sample_csv()
run_etl()
# Verify
print("\n[VERIFY] Querying loaded data:")
conn = sqlite3.connect('sales_warehouse.db')
cursor = conn.cursor()
for row in cursor.execute('SELECT * FROM sales'):
print(f" {row}")
conn.close()Expected output:
==================================================
ETL Pipeline: Sales Data
Started: 2026-06-15T10:00:00
==================================================
[EXTRACT] Reading sales_data.csv...
[EXTRACT] Found 5 rows
[TRANSFORM] Cleaning and enriching...
[WARN] Skipping row: time data 'not-a-date' does not match format '%Y-%m-%d'
[TRANSFORM] 4 valid, 1 errors
[LOAD] Writing to sales_warehouse.db...
[LOAD] Inserted 4 rows
==================================================
ETL Pipeline Complete!
==================================================
[VERIFY] Querying loaded data:
('1001', 'Alice', '2026-06-01', 250.0, 'completed', 'high_value', '2026-06-15T10:00:01')
('1002', 'Bob', '2026-06-02', 45.5, 'pending', 'standard', '2026-06-15T10:00:01')
('1003', 'Charlie', '2026-06-03', 1200.0, 'completed', 'high_value', '2026-06-15T10:00:01')
('1004', 'Diana', '2026-06-04', 89.99, 'cancelled', 'standard', '2026-06-15T10:00:01')Batch vs Streaming ETL
| Factor | Batch ETL | Streaming ETL |
|---|---|---|
| Schedule | Hourly, daily, weekly | Continuous |
| Latency | Minutes to hours | Seconds |
| Storage | Stage files, then process | In-memory state, checkpointed |
| Error handling | Retry entire batch | Per-event error handling |
| Example tool | Airflow + dbt | Kafka Streams + Flink |
Tools in the ETL Ecosystem
| Tool | Role | Type |
|---|---|---|
| Airbyte / Fivetran | Managed connectors for extraction | SaaS / Open source |
| Apache Airflow | Pipeline orchestration and scheduling | Scheduler |
| dbt | SQL-based transformations (ELT) | Transformation |
| Apache Spark | Large-scale batch/streaming transforms | Compute engine |
| Great Expectations | Data quality validation | Testing |
Common ETL Mistakes
1. Not Handling Schema Drift
Source systems add columns or change data types. Your pipeline breaks. Use schema inference, column mapping tables, or schema-on-read approaches.
2. Processing Everything from Scratch
Full refreshes are expensive. Implement incremental extraction using watermark columns (updated_at timestamps) or change data capture (CDC).
3. Ignoring Data Quality
If a transformation silently drops rows, you’ll never know. Log row counts at every stage and compare against expectations.
4. Tight Coupling Transformation Logic
Embedding business logic in extraction code makes changes risky. Separate concerns: extract → raw storage → transform → curated storage.
5. Not Testing Edge Cases
Empty files, null values, duplicate keys, and malformed records will arrive. Write defensive transformations that handle all these cases.
6. Running ETL During Business Hours
Schedule heavy ETL jobs during off-peak hours. Production queries and ETL processes compete for the same warehouse compute resources.
Practice Questions
1. What does ETL stand for and what does each stage do?
Extract (pull data from sources), Transform (clean, validate, aggregate), Load (write to destination). ETL transforms data before loading.
2. How does ELT differ from ETL?
ELT loads raw data first, then transforms inside the warehouse using its compute power. ETL transforms in a staging area before loading. ELT is modern for cloud warehouses.
3. What is incremental extraction?
Only pulling records that changed since the last run, using a watermark column (e.g., updated_at) or CDC logs. It’s faster and cheaper than full refreshes.
4. How would you handle a failed ETL run?
Implement retries with exponential backoff, send alerts on failure, and maintain a log of processed records so you can resume from the last checkpoint rather than restarting.
5. Challenge: Design an ETL pipeline that ingests customer data from a REST API, transforms it, and loads into BigQuery, running every 6 hours.
Use Airflow to schedule. Extract: PythonOperator calling the API with pagination. Transform: dbt models for cleaning and normalization. Load: Airflow’s BigQueryOperator to write staging tables, then dbt runs materializations.
Mini Project: Incremental ETL with State Tracking
# incremental_etl.py
# Track last processed timestamp for incremental loads
import json
import os
STATE_FILE = 'etl_state.json'
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
return json.load(f)
return {"last_run": None, "rows_processed": 0}
def save_state(state):
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def extract_incremental(last_run):
print(f"[EXTRACT] Fetching records since {last_run}")
# Simulate API call with since parameter
data = [
{"id": 1, "updated_at": "2026-06-15T08:00:00"},
{"id": 2, "updated_at": "2026-06-15T09:00:00"},
{"id": 3, "updated_at": "2026-06-15T10:00:00"},
]
if last_run:
data = [r for r in data if r["updated_at"] > last_run]
return data
def transform(rows):
return [{**r, "processed": True} for r in rows]
def load(rows, cursor):
for row in rows:
cursor.append(row)
return len(rows)
state = load_state()
print(f"Previous state: {state}")
new_data = extract_incremental(state["last_run"])
transformed = transform(new_data)
destination = []
loaded = load(transformed, destination)
state["last_run"] = "2026-06-15T10:00:00"
state["rows_processed"] += loaded
save_state(state)
print(f"Loaded {loaded} new records")
print(f"Current state: {state}")
print(f"Destination has {len(destination)} records")Expected output:
Previous state: {'last_run': None, 'rows_processed': 0}
[EXTRACT] Fetching records since None
Loaded 3 new records
Current state: {'last_run': '2026-06-15T10:00:00', 'rows_processed': 3}
Destination has 3 recordsRelated Concepts
What’s Next
You’ve built your first ETL pipeline! Next, learn about data warehousing to understand where your transformed data lives and how to model it for analytics. Then explore Apache Airflow for orchestrating pipelines at scale.
- Practice daily — Modify the ETL script to handle JSON or API sources
- Build a project — Create an ETL pipeline for your personal data (GitHub commits, Spotify history)
- Explore related topics — Check out dbt for SQL-first transformations
Remember: every expert was once a beginner. Keep coding!
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro