Skip to content
ETL Pipelines Explained — Extract, Transform, Load with Python Examples

ETL Pipelines Explained — Extract, Transform, Load with Python Examples

DodaTech Updated Jun 15, 2026 7 min read

An ETL pipeline is a data integration process that extracts data from source systems, transforms it into a usable format, and loads it into a destination such as a data warehouse or data lake.

What You’ll Learn

By the end of this tutorial, you’ll understand the ETL process, how it differs from ELT, when to use batch vs streaming, and how to build a complete ETL pipeline in Python.

Why ETL Matters

Raw data is rarely usable. Databases store data for transactions, not analytics. APIs return nested JSON. Log files have inconsistent formats. ETL pipelines clean, standardize, and structure this data so analysts and data scientists can work with it. DodaTech’s Durga Antivirus Pro uses ETL pipelines to process threat intelligence feeds from dozens of sources into a unified detection format.

ETL Pipeline Architecture


flowchart LR
  subgraph "Extract"
    A1[(PostgreSQL)]
    A2[CSV Files]
    A3[REST API]
  end
  subgraph "Transform"
    B1[Clean]
    B2[Validate]
    B3[Aggregate]
    B4[Join]
  end
  subgraph "Load"
    C1[(Snowflake)]
    C2[(BigQuery)]
    C3[(S3 Parquet)]
  end
  A1 --> B1
  A2 --> B1
  A3 --> B1
  B1 --> B2 --> B3 --> B4
  B4 --> C1
  B4 --> C2
  B4 --> C3

Prerequisites: Python basics, SQL fundamentals. Understanding of Data Engineering overview concepts helps.

ETL vs ELT

ETLELT
OrderExtract → Transform → LoadExtract → Load → Transform
WhereTransform happens in a staging area or middlewareTransform happens inside the warehouse
Best forLegacy systems, complex transformationsCloud warehouses (Snowflake, BigQuery)
ComputeSeparate transformation serverWarehouse compute power
FlexibilityLess flexible (schema defined before load)More flexible (raw data available for reprocessing)

What Is ETL? (The “Why” First)

Think of ETL like preparing ingredients for a restaurant kitchen. Extraction is buying vegetables from the market (multiple sources). Transformation is washing, chopping, and marinating (processing). Loading is placing the prepared ingredients in the walk-in cooler (storage), ready for chefs to use.

Without ETL, every chef (data consumer) would have to go to the market themselves — an impossible waste of time.

Building an ETL Pipeline in Python

Let’s build a complete ETL pipeline that reads sales data from CSV, transforms it, and loads it into a database.

# etl_pipeline.py
# Complete ETL: CSV → Transform → Database (simulated)
import csv
import sqlite3
from datetime import datetime

# ── Extract ──
def extract_csv(filepath):
    print(f"[EXTRACT] Reading {filepath}...")
    with open(filepath, 'r') as f:
        reader = csv.DictReader(f)
        rows = list(reader)
    print(f"[EXTRACT] Found {len(rows)} rows")
    return rows

# ── Transform ──
def transform_sales(rows):
    print("[TRANSFORM] Cleaning and enriching...")
    transformed = []
    errors = 0
    for row in rows:
        try:
            # Parse and standardize date
            row['order_date'] = datetime.strptime(row['order_date'], '%Y-%m-%d').date().isoformat()
            # Clean price (remove $, convert to float)
            row['amount'] = float(row['amount'].replace('$', '').replace(',', ''))
            # Normalize status
            row['status'] = row['status'].strip().lower()
            # Add derived columns
            row['processed_at'] = datetime.now().isoformat()
            if row['amount'] > 100:
                row['category'] = 'high_value'
            else:
                row['category'] = 'standard'
            transformed.append(row)
        except (ValueError, KeyError) as e:
            errors += 1
            print(f"  [WARN] Skipping row: {e}")
    print(f"[TRANSFORM] {len(transformed)} valid, {errors} errors")
    return transformed

# ── Load ──
def load_to_db(rows, db_path):
    print(f"[LOAD] Writing to {db_path}...")
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute('''
        CREATE TABLE IF NOT EXISTS sales (
            order_id TEXT PRIMARY KEY,
            customer_name TEXT,
            order_date TEXT,
            amount REAL,
            status TEXT,
            category TEXT,
            processed_at TEXT
        )
    ''')

    inserted = 0
    for row in rows:
        try:
            cursor.execute('''
                INSERT OR REPLACE INTO sales
                (order_id, customer_name, order_date, amount, status, category, processed_at)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                row['order_id'], row['customer_name'], row['order_date'],
                row['amount'], row['status'], row['category'], row['processed_at']
            ))
            inserted += 1
        except Exception as e:
            print(f"  [ERROR] Failed to insert {row.get('order_id', 'unknown')}: {e}")

    conn.commit()
    conn.close()
    print(f"[LOAD] Inserted {inserted} rows")

# ── Orchestrate ──
def run_etl():
    print("=" * 50)
    print("ETL Pipeline: Sales Data")
    print(f"Started: {datetime.now().isoformat()}")
    print("=" * 50)

    raw_data = extract_csv('sales_data.csv')
    clean_data = transform_sales(raw_data)
    load_to_db(clean_data, 'sales_warehouse.db')

    print("=" * 50)
    print("ETL Pipeline Complete!")
    print("=" * 50)

# ── Test with sample data ──
def create_sample_csv():
    with open('sales_data.csv', 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['order_id', 'customer_name', 'order_date', 'amount', 'status'])
        writer.writerow(['1001', 'Alice', '2026-06-01', '$250.00', 'completed'])
        writer.writerow(['1002', 'Bob', '2026-06-02', '$45.50', 'pending'])
        writer.writerow(['1003', 'Charlie', '2026-06-03', '$1,200.00', 'completed'])
        writer.writerow(['1004', 'Diana', '2026-06-04', '$89.99', 'cancelled'])
        writer.writerow(['1005', 'Invalid', 'not-a-date', '$50.00', 'completed'])

if __name__ == '__main__':
    create_sample_csv()
    run_etl()

    # Verify
    print("\n[VERIFY] Querying loaded data:")
    conn = sqlite3.connect('sales_warehouse.db')
    cursor = conn.cursor()
    for row in cursor.execute('SELECT * FROM sales'):
        print(f"  {row}")
    conn.close()

Expected output:

==================================================
ETL Pipeline: Sales Data
Started: 2026-06-15T10:00:00
==================================================
[EXTRACT] Reading sales_data.csv...
[EXTRACT] Found 5 rows
[TRANSFORM] Cleaning and enriching...
  [WARN] Skipping row: time data 'not-a-date' does not match format '%Y-%m-%d'
[TRANSFORM] 4 valid, 1 errors
[LOAD] Writing to sales_warehouse.db...
[LOAD] Inserted 4 rows
==================================================
ETL Pipeline Complete!
==================================================

[VERIFY] Querying loaded data:
  ('1001', 'Alice', '2026-06-01', 250.0, 'completed', 'high_value', '2026-06-15T10:00:01')
  ('1002', 'Bob', '2026-06-02', 45.5, 'pending', 'standard', '2026-06-15T10:00:01')
  ('1003', 'Charlie', '2026-06-03', 1200.0, 'completed', 'high_value', '2026-06-15T10:00:01')
  ('1004', 'Diana', '2026-06-04', 89.99, 'cancelled', 'standard', '2026-06-15T10:00:01')

Batch vs Streaming ETL

FactorBatch ETLStreaming ETL
ScheduleHourly, daily, weeklyContinuous
LatencyMinutes to hoursSeconds
StorageStage files, then processIn-memory state, checkpointed
Error handlingRetry entire batchPer-event error handling
Example toolAirflow + dbtKafka Streams + Flink

Tools in the ETL Ecosystem

ToolRoleType
Airbyte / FivetranManaged connectors for extractionSaaS / Open source
Apache AirflowPipeline orchestration and schedulingScheduler
dbtSQL-based transformations (ELT)Transformation
Apache SparkLarge-scale batch/streaming transformsCompute engine
Great ExpectationsData quality validationTesting

Common ETL Mistakes

1. Not Handling Schema Drift

Source systems add columns or change data types. Your pipeline breaks. Use schema inference, column mapping tables, or schema-on-read approaches.

2. Processing Everything from Scratch

Full refreshes are expensive. Implement incremental extraction using watermark columns (updated_at timestamps) or change data capture (CDC).

3. Ignoring Data Quality

If a transformation silently drops rows, you’ll never know. Log row counts at every stage and compare against expectations.

4. Tight Coupling Transformation Logic

Embedding business logic in extraction code makes changes risky. Separate concerns: extract → raw storage → transform → curated storage.

5. Not Testing Edge Cases

Empty files, null values, duplicate keys, and malformed records will arrive. Write defensive transformations that handle all these cases.

6. Running ETL During Business Hours

Schedule heavy ETL jobs during off-peak hours. Production queries and ETL processes compete for the same warehouse compute resources.

Practice Questions

1. What does ETL stand for and what does each stage do?

Extract (pull data from sources), Transform (clean, validate, aggregate), Load (write to destination). ETL transforms data before loading.

2. How does ELT differ from ETL?

ELT loads raw data first, then transforms inside the warehouse using its compute power. ETL transforms in a staging area before loading. ELT is modern for cloud warehouses.

3. What is incremental extraction?

Only pulling records that changed since the last run, using a watermark column (e.g., updated_at) or CDC logs. It’s faster and cheaper than full refreshes.

4. How would you handle a failed ETL run?

Implement retries with exponential backoff, send alerts on failure, and maintain a log of processed records so you can resume from the last checkpoint rather than restarting.

5. Challenge: Design an ETL pipeline that ingests customer data from a REST API, transforms it, and loads into BigQuery, running every 6 hours.

Use Airflow to schedule. Extract: PythonOperator calling the API with pagination. Transform: dbt models for cleaning and normalization. Load: Airflow’s BigQueryOperator to write staging tables, then dbt runs materializations.

Mini Project: Incremental ETL with State Tracking

# incremental_etl.py
# Track last processed timestamp for incremental loads

import json
import os

STATE_FILE = 'etl_state.json'

def load_state():
    if os.path.exists(STATE_FILE):
        with open(STATE_FILE) as f:
            return json.load(f)
    return {"last_run": None, "rows_processed": 0}

def save_state(state):
    with open(STATE_FILE, 'w') as f:
        json.dump(state, f, indent=2)

def extract_incremental(last_run):
    print(f"[EXTRACT] Fetching records since {last_run}")
    # Simulate API call with since parameter
    data = [
        {"id": 1, "updated_at": "2026-06-15T08:00:00"},
        {"id": 2, "updated_at": "2026-06-15T09:00:00"},
        {"id": 3, "updated_at": "2026-06-15T10:00:00"},
    ]
    if last_run:
        data = [r for r in data if r["updated_at"] > last_run]
    return data

def transform(rows):
    return [{**r, "processed": True} for r in rows]

def load(rows, cursor):
    for row in rows:
        cursor.append(row)
    return len(rows)

state = load_state()
print(f"Previous state: {state}")

new_data = extract_incremental(state["last_run"])
transformed = transform(new_data)
destination = []
loaded = load(transformed, destination)

state["last_run"] = "2026-06-15T10:00:00"
state["rows_processed"] += loaded
save_state(state)

print(f"Loaded {loaded} new records")
print(f"Current state: {state}")
print(f"Destination has {len(destination)} records")

Expected output:

Previous state: {'last_run': None, 'rows_processed': 0}
[EXTRACT] Fetching records since None
Loaded 3 new records
Current state: {'last_run': '2026-06-15T10:00:00', 'rows_processed': 3}
Destination has 3 records

Related Concepts

What’s Next

You’ve built your first ETL pipeline! Next, learn about data warehousing to understand where your transformed data lives and how to model it for analytics. Then explore Apache Airflow for orchestrating pipelines at scale.

  • Practice daily — Modify the ETL script to handle JSON or API sources
  • Build a project — Create an ETL pipeline for your personal data (GitHub commits, Spotify history)
  • Explore related topics — Check out dbt for SQL-first transformations

Remember: every expert was once a beginner. Keep coding!

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro