Data Lake Architecture — Medallion, Delta Lake, Iceberg & Hudi
A data lake is a centralized repository that stores structured, semi-structured, and unstructured data at any scale, but modern data lake architecture adds transactional guarantees, schema enforcement, and performance optimizations through open table formats like Delta Lake, Iceberg, and Hudi.
What You’ll Learn
In this tutorial, you’ll learn how to design a modern data lake using the medallion architecture (bronze/silver/gold), implement ACID transactions with Delta Lake, Apache Iceberg, and Apache Hudi, and integrate with a catalog for discovery and governance.
Why It Matters
Traditional data lakes became “data swamps” — unstructured, unreliable, and hard to query. Modern lake architecture solves this by layering structure on raw storage, enabling reliable BI, ML, and streaming on the same data. DodaTech uses this architecture to serve analytics across Doda Browser telemetry and Durga Antivirus Pro threat data.
Real-World Use
Uber’s data lake processes 100+ PB across Hudi tables for real-time dispatch analytics. Netflix uses Iceberg on AWS S3 for its 100+ PB analytics lake. Databricks customers run Delta Lake for lakehouse architectures combining BI and ML.
flowchart TD
subgraph Ingestion
A[Raw Data] --> B[Streaming<br/>Kafka/Kinesis]
A --> C[Batch<br/>S3/GCS/ADLS]
end
subgraph Medallion
B --> D[Bronze<br/>Raw ingest]
C --> D
D --> E[Silver<br/>Cleansed]
E --> F[Gold<br/>Aggregated]
end
subgraph Consumption
F --> G[BI Dashboards]
F --> H[ML Models]
F --> I[Ad-hoc Queries]
end
Medallion Architecture
The medallion architecture organizes data into layers of increasing quality and structure.
Bronze Layer — Raw Ingestion
The bronze layer stores data exactly as received. Think of it as the original source of truth — immutable, append-only, with full auditability.
# Simulating bronze layer ingestion
from datetime import datetime
import json
def ingest_to_bronze(source, raw_data, ingestion_time=None):
"""Store raw data with metadata envelope."""
if ingestion_time is None:
ingestion_time = datetime.utcnow()
bronze_record = {
"_source": source,
"_ingested_at": ingestion_time.isoformat(),
"_year": ingestion_time.year,
"_month": ingestion_time.month,
"_day": ingestion_time.day,
"_data": raw_data,
}
# In reality: write to S3/ADLS partitioned by year/month/day
print(f"[Bronze] Ingested {len(str(raw_data))} bytes from {source}")
return bronze_record
# Example: raw clickstream event
click_event = {
"user_id": "u123",
"page": "/tutorials/python",
"timestamp": "2026-06-20T10:30:00Z",
"raw_headers": {"user_agent": "Mozilla/5.0 ..."},
}
bronze = ingest_to_bronze("dodatech_clickstream", click_event)
print(json.dumps(bronze, indent=2))Expected output:
[Bronze] Ingested 132 bytes from dodatech_clickstream
{
"_source": "dodatech_clickstream",
"_ingested_at": "2026-06-20T...",
"_year": 2026,
"_month": 6,
"_day": 20,
"_data": {"user_id": "u123", "page": "/tutorials/python", ...}
}Silver Layer — Cleansed & Validated
The silver layer applies cleaning, deduplication, type casting, and validation. Data here is queryable and reliable.
-- Example: Bronze to Silver transformation
CREATE OR REPLACE VIEW silver.clickstream_clean AS
SELECT
_data:user_id::STRING AS user_id,
_data:page::STRING AS page_path,
_data:timestamp::TIMESTAMP AS event_time,
SPLIT_PART(_data:page::STRING, '/', 2) AS category,
_year,
_month,
_day
FROM bronze.clickstream_raw
WHERE _data:user_id IS NOT NULL
AND _data:page IS NOT NULL;Gold Layer — Business Aggregations
The gold layer contains aggregated, business-ready datasets for dashboards and ML.
# Simulating gold layer aggregation
from collections import defaultdict
from datetime import datetime, timedelta
import random
def aggregate_gold(clickstream_events):
"""Aggregate page views by category per hour."""
hourly = defaultdict(lambda: defaultdict(int))
for event in clickstream_events:
hour_key = event["event_time"].strftime("%Y-%m-%d %H:00")
hourly[hour_key][event["category"]] += 1
return hourly
# Simulate cleaned silver events
now = datetime.utcnow()
silver_events = [
{"user_id": f"u{random.randint(1,100)}",
"page": f"/{random.choice(['python','java','docker','kafka'])}/tutorial",
"event_time": now - timedelta(hours=random.randint(0, 24)),
"category": random.choice(['python','java','docker','kafka'])}
for _ in range(100)
]
gold = aggregate_gold(silver_events)
for hour, cats in sorted(gold.items())[:3]:
print(f"{hour}: {dict(cats)}")Expected output:
2026-06-19 11:00: {'python': 4, 'java': 3, 'docker': 5, 'kafka': 2}
2026-06-19 12:00: {'java': 6, 'python': 3, 'docker': 4, 'kafka': 1}
2026-06-19 13:00: {'python': 5, 'kafka': 3, 'docker': 1, 'java': 3}Delta Lake
Apache Spark’s Delta Lake is an open-source storage layer that brings ACID transactions, schema enforcement, and time travel to data lakes.
Key Features
- ACID Transactions: Concurrent reads and writes with serializable isolation
- Schema Enforcement: Prevents corrupt data from being written
- Time Travel: Query previous versions of data
- Upserts/Merge: Efficient
MERGEoperations - Caching: Accelerates repeated queries
# Simulating Delta Lake operations
class DeltaTable:
def __init__(self, path, schema):
self.path = path
self.schema = schema
self._version = 0
self._data = {}
self._transaction_log = []
def write(self, records, mode="append"):
"""Simulate Delta write with ACID transaction log."""
txn_id = f"txn_{self._version + 1}"
self._transaction_log.append({
"txn_id": txn_id,
"version": self._version + 1,
"mode": mode,
"timestamp": "2026-06-20T12:00:00Z",
"num_records": len(records),
})
if mode == "overwrite":
self._data = {}
elif mode == "merge":
existing_keys = {r.get("id") for r in self._data.values()}
for r in records:
key = r.get("id")
if key in existing_keys:
self._data[key] = r # update
else:
self._data[len(self._data)] = r # insert
else:
for r in records:
self._data[len(self._data)] = r
self._version += 1
print(f"[Delta] Version {self._version}: {mode} {len(records)} records")
return txn_id
def time_travel(self, version):
"""Query a specific version (time travel)."""
print(f"[Delta] Time travel to version {version}")
return list(self._data.values())[:version] if self._data else []
# Simulate
delta = DeltaTable("/data/clickstream", {"user_id": "string", "page": "string"})
delta.write([{"user_id": "u1", "page": "/python"}, {"user_id": "u2", "page": "/java"}])
delta.write([{"user_id": "u3", "page": "/docker"}], mode="append")
delta.write([{"user_id": "u1", "page": "/python/advanced"}], mode="merge")
print(f"\nTransaction log: {len(delta._transaction_log)} entries")
print(f"Version {1} data: {delta.time_travel(1)}")Expected output:
[Delta] Version 1: append 2 records
[Delta] Version 2: append 1 records
[Delta] Version 3: merge 1 records
Transaction log: 3 entries
Version 1 data: [{'user_id': 'u1', 'page': '/python'}, {'user_id': 'u2', 'page': '/java'}]Apache Iceberg
Apache Iceberg is an open table format designed for huge analytic datasets, offering snapshot isolation, partition evolution, and hidden partitioning.
Key Differences from Delta Lake
| Feature | Delta Lake | Iceberg |
|---|---|---|
| Format | Parquet + transaction log | Open spec (Parquet, Avro, ORC) |
| Partition Evolution | Manual | Automatic, hidden |
| Catalog Integration | Hive Metastore | REST, JDBC, Hive, Glue, Nessie |
| Engine Support | Spark, Flink, Hive, Trino | Spark, Flink, Trino, Presto, Hive, Dremio |
| File Layout | Transaction log in _delta_log | Manifest files + metadata |
-- Iceberg DDL: partition evolution without rewriting
CREATE TABLE iceberg.analytics.page_views (
user_id STRING,
page STRING,
view_time TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(view_time));
-- Later: evolve partitioning without rewriting data
ALTER TABLE iceberg.analytics.page_views
SET PARTITION TRANSFORM (bucket(16, user_id));# Simulating Iceberg's manifest-based architecture
class IcebergTable:
def __init__(self, name):
self.name = name
self._metadata = {
"format-version": 2,
"table-uuid": "abc-123",
"location": f"/data/iceberg/{name}",
"current-snapshot-id": 0,
"snapshots": [],
}
def append(self, data_files):
"""Simulate an Iceberg append creating a new snapshot."""
snapshot_id = self._metadata["current-snapshot-id"] + 1
manifest = {
"snapshot-id": snapshot_id,
"timestamp-ms": 1750444800000,
"manifests": [f"{f}.avro" for f in data_files],
"summary": {"total-records": len(data_files) * 100},
}
self._metadata["snapshots"].append(manifest)
self._metadata["current-snapshot-id"] = snapshot_id
print(f"[Iceberg] Snapshot {snapshot_id}: {len(data_files)} data files")
return snapshot_id
def query_snapshot(self, snapshot_id=None):
"""Query data from a snapshot."""
sid = snapshot_id or self._metadata["current-snapshot-id"]
manifests = [s["manifests"] for s in self._metadata["snapshots"]
if s["snapshot-id"] == sid]
print(f"[Iceberg] Reading snapshot {sid}: {manifests}")
return manifests
iceberg = IcebergTable("clickstream")
iceberg.append(["file1.parquet", "file2.parquet"])
iceberg.append(["file3.parquet"])
iceberg.query_snapshot(1)Expected output:
[Iceberg] Snapshot 1: 2 data files
[Iceberg] Snapshot 2: 1 data files
[Iceberg] Reading snapshot 1: [['file1.parquet', 'file2.parquet']]Apache Hudi
Apache Hudi (Hadoop Upserts Deletes and Incrementals) specializes in low-latency upserts and incremental data processing on the data lake.
-- Hudi DDL with record-level indexing
CREATE TABLE hudi.rides (
ride_id STRING,
driver_id STRING,
fare DOUBLE,
ride_time TIMESTAMP
)
USING hudi
OPTIONS (
primaryKey = 'ride_id',
preCombineField = 'ride_time',
type = 'cow' -- Copy-on-Write or Merge-on-Read
);# Simulating Hudi upsert behavior
class HudiTable:
def __init__(self, primary_key, table_type="cow"):
self.primary_key = primary_key
self.table_type = table_type
self._records = {}
self._commits = []
def upsert(self, records):
"""Upsert: insert new or update existing records."""
for record in records:
key = record[self.primary_key]
self._records[key] = record
commit_time = f"commit_{len(self._commits) + 1}"
self._commits.append({
"commit": commit_time,
"action": "upsert",
"records": len(records),
})
print(f"[Hudi] {commit_time}: upsert {len(records)} records (table: {self.table_type})")
def incremental_query(self, since_commit):
"""Read only records changed after a given commit."""
changed = []
for key, record in self._records.items():
if len(self._commits) > since_commit:
changed.append(record)
print(f"[Hudi] Incremental since commit {since_commit}: {len(changed)} records")
return changed
def compaction(self):
"""Merge-on-Read tables need compaction."""
if self.table_type == "mor":
print(f"[Hudi] Compaction completed: merged log files")
else:
print(f"[Hudi] Copy-on-Write: no compaction needed")
hudi = HudiTable("ride_id", table_type="mor")
hudi.upsert([{"ride_id": "r1", "driver_id": "d1", "fare": 25.0}])
hudi.upsert([{"ride_id": "r2", "driver_id": "d2", "fare": 18.0}])
hudi.upsert([{"ride_id": "r1", "driver_id": "d1", "fare": 30.0}]) # update
hudi.incremental_query(0)
hudi.compaction()Expected output:
[Hudi] commit_1: upsert 1 records (table: mor)
[Hudi] commit_2: upsert 1 records (table: mor)
[Hudi] commit_3: upsert 1 records (table: mor)
[Hudi] Incremental since commit 0: 2 records
[Hudi] Compaction completed: merged log filesCatalog Integration
A data catalog provides a unified view of all tables across the lake, enabling discovery, governance, and query engine integration.
# Simulating catalog operations with the REST catalog interface
class DataCatalog:
def __init__(self, name):
self.name = name
self._namespaces = {}
self._tables = {}
def create_namespace(self, namespace):
if namespace not in self._namespaces:
self._namespaces[namespace] = {"tables": []}
print(f"[Catalog] Created namespace: {namespace}")
def register_table(self, namespace, table_name, format_type, location):
key = f"{namespace}.{table_name}"
table_info = {
"name": table_name,
"type": format_type,
"location": location,
"columns": [],
}
self._tables[key] = table_info
self._namespaces[namespace]["tables"].append(table_name)
print(f"[Catalog] Registered {format_type} table: {key} -> {location}")
def list_tables(self, namespace):
return self._namespaces.get(namespace, {}).get("tables", [])
def get_table(self, namespace, table_name):
return self._tables.get(f"{namespace}.{table_name}")
# Simulate catalog setup
catalog = DataCatalog("dodatech_glue")
catalog.create_namespace("bronze")
catalog.create_namespace("silver")
catalog.create_namespace("gold")
catalog.register_table("bronze", "clickstream_raw", "delta",
"s3://data/bronze/clickstream")
catalog.register_table("silver", "clickstream_clean", "iceberg",
"s3://data/silver/clickstream")
catalog.register_table("gold", "daily_page_views", "delta",
"s3://data/gold/daily_page_views")
print(f"\nSilver tables: {catalog.list_tables('silver')}")
print(f"Table info: {catalog.get_table('silver', 'clickstream_clean')}")Expected output:
[Catalog] Created namespace: bronze
[Catalog] Created namespace: silver
[Catalog] Created namespace: gold
[Catalog] Registered delta table: bronze.clickstream_raw -> s3://data/bronze/clickstream
[Catalog] Registered iceberg table: silver.clickstream_clean -> s3://data/silver/clickstream
[Catalog] Registered delta table: gold.daily_page_views -> s3://data/gold/daily_page_views
Silver tables: ['clickstream_clean']
Table info: {'name': 'clickstream_clean', 'type': 'iceberg', 'location': 's3://data/silver/clickstream', 'columns': []}Common Mistakes
1. No Bronze Layer
Storing only clean data means you lose the original record. Always keep bronze for reprocessing and audit.
2. Schema-on-Write Without Governance
Enforcing schema in silver is good, but make bronze flexible enough to handle schema changes from upstream sources.
3. Choosing the Wrong Table Format
Delta Lake integrates best with Spark. Iceberg is more engine-agnostic. Hudi excels in near-real-time upsert scenarios. Match the format to your primary query engine.
4. Ignoring Partitioning Strategy
Too many small partitions creates metadata overhead. Too few creates large scans. Choose partition keys that match your common query filters.
5. Not Using a Catalog
Hard-coded table paths in notebooks creates chaos. Use a catalog (Glue, Hive, Nessie, Unity) for discovery and governance.
6. Skipping Vacuum/Compaction
Delta Lake’s VACUUM and Hudi’s compaction prevent storage bloat. Schedule these maintenance operations regularly.
Practice Questions
What are the three layers of the medallion architecture? Bronze (raw/immutable), Silver (cleansed/validated), Gold (aggregated/business-ready).
What is the key advantage of Delta Lake over a plain Parquet data lake? ACID transactions, schema enforcement, time travel, and upsert/merge support.
How does Iceberg’s partition evolution differ from legacy partitioning? Iceberg supports hidden partitioning that can evolve without rewriting existing data files.
What is the difference between Hudi’s Copy-on-Write and Merge-on-Read table types? CoW rewrites files on each commit (faster reads, slower writes). MoR writes delta logs (faster writes, requires compaction).
Challenge: Design a data lake architecture for a ride-sharing company that ingests real-time GPS data, driver profiles, and trip transactions. Which table format would you choose for each?
Mini Project: Data Lake Health Monitor
# lake_health_monitor.py
# Simulate monitoring data lake table health
from datetime import datetime, timedelta
class LakeHealthMonitor:
def __init__(self):
self.tables = {}
def add_table(self, name, format_type, num_files, total_size_gb,
last_optimized, num_versions):
days_since_opt = (datetime.now() - last_optimized).days
health_score = 100
health_score -= min(20, num_files // 100) # too many small files
health_score -= min(15, num_versions - 10 if num_versions > 10 else 0)
health_score -= min(25, days_since_opt * 2)
self.tables[name] = {
"format": format_type,
"files": num_files,
"size_gb": total_size_gb,
"days_since_optimize": days_since_opt,
"versions": num_versions,
"health": max(0, health_score),
}
return self.tables[name]
def report(self):
print(f"{'Table':<25} {'Format':<10} {'Files':<8} {'Size(GB)':<10} "
f"{'Versions':<10} {'Health':<8}")
print("-" * 75)
for name, info in sorted(self.tables.items()):
print(f"{name:<25} {info['format']:<10} {info['files']:<8} "
f"{info['size_gb']:<10} {info['versions']:<10} {info['health']:<8}")
monitor = LakeHealthMonitor()
monitor.add_table("bronze.clickstream", "delta", 1500, 45.0,
datetime.now() - timedelta(days=14), 25)
monitor.add_table("silver.clickstream", "iceberg", 200, 12.0,
datetime.now() - timedelta(days=3), 8)
monitor.add_table("gold.daily_views", "delta", 50, 2.5,
datetime.now() - timedelta(days=1), 15)
monitor.report()Expected output:
Table Format Files Size(GB) Versions Health
---------------------------------------------------------------------------
bronze.clickstream delta 1500 45.0 25 58
gold.daily_views delta 50 2.5 15 77
silver.clickstream iceberg 200 12.0 8 94What’s Next
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro