Skip to content
Data Lake Architecture — Medallion, Delta Lake, Iceberg & Hudi

Data Lake Architecture — Medallion, Delta Lake, Iceberg & Hudi

DodaTech Updated Jun 20, 2026 10 min read

A data lake is a centralized repository that stores structured, semi-structured, and unstructured data at any scale, but modern data lake architecture adds transactional guarantees, schema enforcement, and performance optimizations through open table formats like Delta Lake, Iceberg, and Hudi.

What You’ll Learn

In this tutorial, you’ll learn how to design a modern data lake using the medallion architecture (bronze/silver/gold), implement ACID transactions with Delta Lake, Apache Iceberg, and Apache Hudi, and integrate with a catalog for discovery and governance.

Why It Matters

Traditional data lakes became “data swamps” — unstructured, unreliable, and hard to query. Modern lake architecture solves this by layering structure on raw storage, enabling reliable BI, ML, and streaming on the same data. DodaTech uses this architecture to serve analytics across Doda Browser telemetry and Durga Antivirus Pro threat data.

Real-World Use

Uber’s data lake processes 100+ PB across Hudi tables for real-time dispatch analytics. Netflix uses Iceberg on AWS S3 for its 100+ PB analytics lake. Databricks customers run Delta Lake for lakehouse architectures combining BI and ML.

    flowchart TD
    subgraph Ingestion
        A[Raw Data] --> B[Streaming<br/>Kafka/Kinesis]
        A --> C[Batch<br/>S3/GCS/ADLS]
    end
    subgraph Medallion
        B --> D[Bronze<br/>Raw ingest]
        C --> D
        D --> E[Silver<br/>Cleansed]
        E --> F[Gold<br/>Aggregated]
    end
    subgraph Consumption
        F --> G[BI Dashboards]
        F --> H[ML Models]
        F --> I[Ad-hoc Queries]
    end
  

Medallion Architecture

The medallion architecture organizes data into layers of increasing quality and structure.

Bronze Layer — Raw Ingestion

The bronze layer stores data exactly as received. Think of it as the original source of truth — immutable, append-only, with full auditability.

# Simulating bronze layer ingestion
from datetime import datetime
import json

def ingest_to_bronze(source, raw_data, ingestion_time=None):
    """Store raw data with metadata envelope."""
    if ingestion_time is None:
        ingestion_time = datetime.utcnow()

    bronze_record = {
        "_source": source,
        "_ingested_at": ingestion_time.isoformat(),
        "_year": ingestion_time.year,
        "_month": ingestion_time.month,
        "_day": ingestion_time.day,
        "_data": raw_data,
    }
    # In reality: write to S3/ADLS partitioned by year/month/day
    print(f"[Bronze] Ingested {len(str(raw_data))} bytes from {source}")
    return bronze_record

# Example: raw clickstream event
click_event = {
    "user_id": "u123",
    "page": "/tutorials/python",
    "timestamp": "2026-06-20T10:30:00Z",
    "raw_headers": {"user_agent": "Mozilla/5.0 ..."},
}

bronze = ingest_to_bronze("dodatech_clickstream", click_event)
print(json.dumps(bronze, indent=2))

Expected output:

[Bronze] Ingested 132 bytes from dodatech_clickstream
{
  "_source": "dodatech_clickstream",
  "_ingested_at": "2026-06-20T...",
  "_year": 2026,
  "_month": 6,
  "_day": 20,
  "_data": {"user_id": "u123", "page": "/tutorials/python", ...}
}

Silver Layer — Cleansed & Validated

The silver layer applies cleaning, deduplication, type casting, and validation. Data here is queryable and reliable.

-- Example: Bronze to Silver transformation
CREATE OR REPLACE VIEW silver.clickstream_clean AS
SELECT
    _data:user_id::STRING AS user_id,
    _data:page::STRING AS page_path,
    _data:timestamp::TIMESTAMP AS event_time,
    SPLIT_PART(_data:page::STRING, '/', 2) AS category,
    _year,
    _month,
    _day
FROM bronze.clickstream_raw
WHERE _data:user_id IS NOT NULL
  AND _data:page IS NOT NULL;

Gold Layer — Business Aggregations

The gold layer contains aggregated, business-ready datasets for dashboards and ML.

# Simulating gold layer aggregation
from collections import defaultdict
from datetime import datetime, timedelta
import random

def aggregate_gold(clickstream_events):
    """Aggregate page views by category per hour."""
    hourly = defaultdict(lambda: defaultdict(int))
    for event in clickstream_events:
        hour_key = event["event_time"].strftime("%Y-%m-%d %H:00")
        hourly[hour_key][event["category"]] += 1
    return hourly

# Simulate cleaned silver events
now = datetime.utcnow()
silver_events = [
    {"user_id": f"u{random.randint(1,100)}",
     "page": f"/{random.choice(['python','java','docker','kafka'])}/tutorial",
     "event_time": now - timedelta(hours=random.randint(0, 24)),
     "category": random.choice(['python','java','docker','kafka'])}
    for _ in range(100)
]

gold = aggregate_gold(silver_events)
for hour, cats in sorted(gold.items())[:3]:
    print(f"{hour}: {dict(cats)}")

Expected output:

2026-06-19 11:00: {'python': 4, 'java': 3, 'docker': 5, 'kafka': 2}
2026-06-19 12:00: {'java': 6, 'python': 3, 'docker': 4, 'kafka': 1}
2026-06-19 13:00: {'python': 5, 'kafka': 3, 'docker': 1, 'java': 3}

Delta Lake

Apache Spark’s Delta Lake is an open-source storage layer that brings ACID transactions, schema enforcement, and time travel to data lakes.

Key Features

  • ACID Transactions: Concurrent reads and writes with serializable isolation
  • Schema Enforcement: Prevents corrupt data from being written
  • Time Travel: Query previous versions of data
  • Upserts/Merge: Efficient MERGE operations
  • Caching: Accelerates repeated queries
# Simulating Delta Lake operations
class DeltaTable:
    def __init__(self, path, schema):
        self.path = path
        self.schema = schema
        self._version = 0
        self._data = {}
        self._transaction_log = []

    def write(self, records, mode="append"):
        """Simulate Delta write with ACID transaction log."""
        txn_id = f"txn_{self._version + 1}"
        self._transaction_log.append({
            "txn_id": txn_id,
            "version": self._version + 1,
            "mode": mode,
            "timestamp": "2026-06-20T12:00:00Z",
            "num_records": len(records),
        })

        if mode == "overwrite":
            self._data = {}
        elif mode == "merge":
            existing_keys = {r.get("id") for r in self._data.values()}
            for r in records:
                key = r.get("id")
                if key in existing_keys:
                    self._data[key] = r  # update
                else:
                    self._data[len(self._data)] = r  # insert
        else:
            for r in records:
                self._data[len(self._data)] = r

        self._version += 1
        print(f"[Delta] Version {self._version}: {mode} {len(records)} records")
        return txn_id

    def time_travel(self, version):
        """Query a specific version (time travel)."""
        print(f"[Delta] Time travel to version {version}")
        return list(self._data.values())[:version] if self._data else []

# Simulate
delta = DeltaTable("/data/clickstream", {"user_id": "string", "page": "string"})
delta.write([{"user_id": "u1", "page": "/python"}, {"user_id": "u2", "page": "/java"}])
delta.write([{"user_id": "u3", "page": "/docker"}], mode="append")
delta.write([{"user_id": "u1", "page": "/python/advanced"}], mode="merge")

print(f"\nTransaction log: {len(delta._transaction_log)} entries")
print(f"Version {1} data: {delta.time_travel(1)}")

Expected output:

[Delta] Version 1: append 2 records
[Delta] Version 2: append 1 records
[Delta] Version 3: merge 1 records

Transaction log: 3 entries
Version 1 data: [{'user_id': 'u1', 'page': '/python'}, {'user_id': 'u2', 'page': '/java'}]

Apache Iceberg

Apache Iceberg is an open table format designed for huge analytic datasets, offering snapshot isolation, partition evolution, and hidden partitioning.

Key Differences from Delta Lake

FeatureDelta LakeIceberg
FormatParquet + transaction logOpen spec (Parquet, Avro, ORC)
Partition EvolutionManualAutomatic, hidden
Catalog IntegrationHive MetastoreREST, JDBC, Hive, Glue, Nessie
Engine SupportSpark, Flink, Hive, TrinoSpark, Flink, Trino, Presto, Hive, Dremio
File LayoutTransaction log in _delta_logManifest files + metadata
-- Iceberg DDL: partition evolution without rewriting
CREATE TABLE iceberg.analytics.page_views (
    user_id STRING,
    page STRING,
    view_time TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(view_time));

-- Later: evolve partitioning without rewriting data
ALTER TABLE iceberg.analytics.page_views
SET PARTITION TRANSFORM (bucket(16, user_id));
# Simulating Iceberg's manifest-based architecture
class IcebergTable:
    def __init__(self, name):
        self.name = name
        self._metadata = {
            "format-version": 2,
            "table-uuid": "abc-123",
            "location": f"/data/iceberg/{name}",
            "current-snapshot-id": 0,
            "snapshots": [],
        }

    def append(self, data_files):
        """Simulate an Iceberg append creating a new snapshot."""
        snapshot_id = self._metadata["current-snapshot-id"] + 1
        manifest = {
            "snapshot-id": snapshot_id,
            "timestamp-ms": 1750444800000,
            "manifests": [f"{f}.avro" for f in data_files],
            "summary": {"total-records": len(data_files) * 100},
        }
        self._metadata["snapshots"].append(manifest)
        self._metadata["current-snapshot-id"] = snapshot_id
        print(f"[Iceberg] Snapshot {snapshot_id}: {len(data_files)} data files")
        return snapshot_id

    def query_snapshot(self, snapshot_id=None):
        """Query data from a snapshot."""
        sid = snapshot_id or self._metadata["current-snapshot-id"]
        manifests = [s["manifests"] for s in self._metadata["snapshots"]
                     if s["snapshot-id"] == sid]
        print(f"[Iceberg] Reading snapshot {sid}: {manifests}")
        return manifests

iceberg = IcebergTable("clickstream")
iceberg.append(["file1.parquet", "file2.parquet"])
iceberg.append(["file3.parquet"])
iceberg.query_snapshot(1)

Expected output:

[Iceberg] Snapshot 1: 2 data files
[Iceberg] Snapshot 2: 1 data files
[Iceberg] Reading snapshot 1: [['file1.parquet', 'file2.parquet']]

Apache Hudi

Apache Hudi (Hadoop Upserts Deletes and Incrementals) specializes in low-latency upserts and incremental data processing on the data lake.

-- Hudi DDL with record-level indexing
CREATE TABLE hudi.rides (
    ride_id STRING,
    driver_id STRING,
    fare DOUBLE,
    ride_time TIMESTAMP
)
USING hudi
OPTIONS (
    primaryKey = 'ride_id',
    preCombineField = 'ride_time',
    type = 'cow'  -- Copy-on-Write or Merge-on-Read
);
# Simulating Hudi upsert behavior
class HudiTable:
    def __init__(self, primary_key, table_type="cow"):
        self.primary_key = primary_key
        self.table_type = table_type
        self._records = {}
        self._commits = []

    def upsert(self, records):
        """Upsert: insert new or update existing records."""
        for record in records:
            key = record[self.primary_key]
            self._records[key] = record
        commit_time = f"commit_{len(self._commits) + 1}"
        self._commits.append({
            "commit": commit_time,
            "action": "upsert",
            "records": len(records),
        })
        print(f"[Hudi] {commit_time}: upsert {len(records)} records (table: {self.table_type})")

    def incremental_query(self, since_commit):
        """Read only records changed after a given commit."""
        changed = []
        for key, record in self._records.items():
            if len(self._commits) > since_commit:
                changed.append(record)
        print(f"[Hudi] Incremental since commit {since_commit}: {len(changed)} records")
        return changed

    def compaction(self):
        """Merge-on-Read tables need compaction."""
        if self.table_type == "mor":
            print(f"[Hudi] Compaction completed: merged log files")
        else:
            print(f"[Hudi] Copy-on-Write: no compaction needed")

hudi = HudiTable("ride_id", table_type="mor")
hudi.upsert([{"ride_id": "r1", "driver_id": "d1", "fare": 25.0}])
hudi.upsert([{"ride_id": "r2", "driver_id": "d2", "fare": 18.0}])
hudi.upsert([{"ride_id": "r1", "driver_id": "d1", "fare": 30.0}])  # update
hudi.incremental_query(0)
hudi.compaction()

Expected output:

[Hudi] commit_1: upsert 1 records (table: mor)
[Hudi] commit_2: upsert 1 records (table: mor)
[Hudi] commit_3: upsert 1 records (table: mor)
[Hudi] Incremental since commit 0: 2 records
[Hudi] Compaction completed: merged log files

Catalog Integration

A data catalog provides a unified view of all tables across the lake, enabling discovery, governance, and query engine integration.

# Simulating catalog operations with the REST catalog interface
class DataCatalog:
    def __init__(self, name):
        self.name = name
        self._namespaces = {}
        self._tables = {}

    def create_namespace(self, namespace):
        if namespace not in self._namespaces:
            self._namespaces[namespace] = {"tables": []}
            print(f"[Catalog] Created namespace: {namespace}")

    def register_table(self, namespace, table_name, format_type, location):
        key = f"{namespace}.{table_name}"
        table_info = {
            "name": table_name,
            "type": format_type,
            "location": location,
            "columns": [],
        }
        self._tables[key] = table_info
        self._namespaces[namespace]["tables"].append(table_name)
        print(f"[Catalog] Registered {format_type} table: {key} -> {location}")

    def list_tables(self, namespace):
        return self._namespaces.get(namespace, {}).get("tables", [])

    def get_table(self, namespace, table_name):
        return self._tables.get(f"{namespace}.{table_name}")

# Simulate catalog setup
catalog = DataCatalog("dodatech_glue")
catalog.create_namespace("bronze")
catalog.create_namespace("silver")
catalog.create_namespace("gold")

catalog.register_table("bronze", "clickstream_raw", "delta",
                       "s3://data/bronze/clickstream")
catalog.register_table("silver", "clickstream_clean", "iceberg",
                       "s3://data/silver/clickstream")
catalog.register_table("gold", "daily_page_views", "delta",
                       "s3://data/gold/daily_page_views")

print(f"\nSilver tables: {catalog.list_tables('silver')}")
print(f"Table info: {catalog.get_table('silver', 'clickstream_clean')}")

Expected output:

[Catalog] Created namespace: bronze
[Catalog] Created namespace: silver
[Catalog] Created namespace: gold
[Catalog] Registered delta table: bronze.clickstream_raw -> s3://data/bronze/clickstream
[Catalog] Registered iceberg table: silver.clickstream_clean -> s3://data/silver/clickstream
[Catalog] Registered delta table: gold.daily_page_views -> s3://data/gold/daily_page_views

Silver tables: ['clickstream_clean']
Table info: {'name': 'clickstream_clean', 'type': 'iceberg', 'location': 's3://data/silver/clickstream', 'columns': []}

Common Mistakes

1. No Bronze Layer

Storing only clean data means you lose the original record. Always keep bronze for reprocessing and audit.

2. Schema-on-Write Without Governance

Enforcing schema in silver is good, but make bronze flexible enough to handle schema changes from upstream sources.

3. Choosing the Wrong Table Format

Delta Lake integrates best with Spark. Iceberg is more engine-agnostic. Hudi excels in near-real-time upsert scenarios. Match the format to your primary query engine.

4. Ignoring Partitioning Strategy

Too many small partitions creates metadata overhead. Too few creates large scans. Choose partition keys that match your common query filters.

5. Not Using a Catalog

Hard-coded table paths in notebooks creates chaos. Use a catalog (Glue, Hive, Nessie, Unity) for discovery and governance.

6. Skipping Vacuum/Compaction

Delta Lake’s VACUUM and Hudi’s compaction prevent storage bloat. Schedule these maintenance operations regularly.

Practice Questions

  1. What are the three layers of the medallion architecture? Bronze (raw/immutable), Silver (cleansed/validated), Gold (aggregated/business-ready).

  2. What is the key advantage of Delta Lake over a plain Parquet data lake? ACID transactions, schema enforcement, time travel, and upsert/merge support.

  3. How does Iceberg’s partition evolution differ from legacy partitioning? Iceberg supports hidden partitioning that can evolve without rewriting existing data files.

  4. What is the difference between Hudi’s Copy-on-Write and Merge-on-Read table types? CoW rewrites files on each commit (faster reads, slower writes). MoR writes delta logs (faster writes, requires compaction).

  5. Challenge: Design a data lake architecture for a ride-sharing company that ingests real-time GPS data, driver profiles, and trip transactions. Which table format would you choose for each?

Mini Project: Data Lake Health Monitor

# lake_health_monitor.py
# Simulate monitoring data lake table health
from datetime import datetime, timedelta

class LakeHealthMonitor:
    def __init__(self):
        self.tables = {}

    def add_table(self, name, format_type, num_files, total_size_gb,
                  last_optimized, num_versions):
        days_since_opt = (datetime.now() - last_optimized).days
        health_score = 100
        health_score -= min(20, num_files // 100)  # too many small files
        health_score -= min(15, num_versions - 10 if num_versions > 10 else 0)
        health_score -= min(25, days_since_opt * 2)

        self.tables[name] = {
            "format": format_type,
            "files": num_files,
            "size_gb": total_size_gb,
            "days_since_optimize": days_since_opt,
            "versions": num_versions,
            "health": max(0, health_score),
        }
        return self.tables[name]

    def report(self):
        print(f"{'Table':<25} {'Format':<10} {'Files':<8} {'Size(GB)':<10} "
              f"{'Versions':<10} {'Health':<8}")
        print("-" * 75)
        for name, info in sorted(self.tables.items()):
            print(f"{name:<25} {info['format']:<10} {info['files']:<8} "
                  f"{info['size_gb']:<10} {info['versions']:<10} {info['health']:<8}")

monitor = LakeHealthMonitor()
monitor.add_table("bronze.clickstream", "delta", 1500, 45.0,
                  datetime.now() - timedelta(days=14), 25)
monitor.add_table("silver.clickstream", "iceberg", 200, 12.0,
                  datetime.now() - timedelta(days=3), 8)
monitor.add_table("gold.daily_views", "delta", 50, 2.5,
                  datetime.now() - timedelta(days=1), 15)
monitor.report()

Expected output:

Table                     Format     Files    Size(GB)   Versions   Health
---------------------------------------------------------------------------
bronze.clickstream        delta      1500     45.0       25         58
gold.daily_views          delta      50       2.5        15         77
silver.clickstream        iceberg    200      12.0       8          94

What’s Next

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro