Real-Time Data Pipelines — Kafka Streams, KSQL, Flink SQL & CDC
A real-time data pipeline ingests, processes, and delivers data with latencies measured in seconds or milliseconds — using technologies like Kafka Streams, KSQL, Flink SQL, and CDC (Change Data Capture) with Debezium to process events as they happen.
What You’ll Learn
This tutorial covers the complete real-time pipeline stack: building stream processors with Kafka Streams, writing streaming SQL with KSQL and Flink SQL, capturing database changes with Debezium CDC, comparing Lambda vs Kappa architectures, and production deployment patterns.
Why It Matters
Batch pipelines have hours of latency. Real-time pipelines unlock immediate action — fraud detection, personalization, monitoring, and operational analytics. DodaTech’s Durga Antivirus Pro processes threat intelligence streams in real time to block attacks within seconds of detection.
Real-World Use
Netflix uses Kafka Streams for real-time personalization of 200M+ subscriber recommendations. Uber processes 100B+ events daily through Flink for dispatch optimization. Shopify streams 1B+ events/minute through Kafka for inventory and checkout processing.
flowchart LR
subgraph Sources
A[Application<br/>Events] --> D[Kafka]
B[Database CDC<br/>Debezium] --> D
C[Log Files] --> D
end
subgraph Processing
D --> E[Kafka Streams<br/>KStream/KTable]
D --> F[KSQL / Flink SQL]
E --> G[State Store<br/>RocksDB]
F --> H[Streaming<br/>Aggregations]
end
subgraph Sinks
G --> I[Real-time DB<br/>Cassandra/Redis]
H --> J[Dashboard]
H --> K[Alert System]
H --> L[Data Lake]
end
Kafka Streams — Lightweight Stream Processing
Kafka Streams is a client library for building stream processing applications directly on top of Kafka, without requiring a separate processing cluster.
Core Abstractions
- KStream: An unbounded stream of records (changelog)
- KTable: A changelog keyed by record key (table-like state)
- GlobalKTable: A fully replicated changelog for joins
# Simulating Kafka Streams concepts in Python
class KafkaStreamsProcessor:
"""Simulate a Kafka Streams topology."""
def __init__(self, application_id):
self.app_id = application_id
self._state = {}
self._stores = {}
def kstream(self, topic):
"""Create a KStream from a topic (simulated)."""
return KStream(topic, self)
def ktable(self, topic):
"""Create a KTable (changelog) from a compacted topic."""
return KTable(topic, self)
class KStream:
def __init__(self, topic, processor):
self.topic = topic
self.processor = processor
def filter(self, predicate):
"""Filter records that match a condition."""
ops = self.processor._state.setdefault(self.topic, [])
ops.append(("filter", predicate))
return self
def map_values(self, mapper):
"""Transform values."""
ops = self.processor._state.setdefault(self.topic, [])
ops.append(("map_values", mapper))
return self
def group_by_key(self):
"""Group records by their key."""
return GroupedStream(self.topic, self.processor)
def join(self, other_stream, joiner, window_ms=60000):
"""Stream-stream join within a window."""
ops = self.processor._state.setdefault(f"{self.topic}_join", [])
ops.append(("join", other_stream.topic, joiner, window_ms))
return self
def process(self, records):
"""Run records through the stream operations."""
ops = self.processor._state.get(self.topic, [])
result = []
for record in records:
r = dict(record)
for op_type, *args in ops:
if op_type == "filter":
if not args[0](r):
r = None
break
elif op_type == "map_values":
r["value"] = args[0](r["value"])
elif op_type == "map":
r = args[0](r)
if r:
result.append(r)
return result
class KTable:
def __init__(self, topic, processor):
self.topic = topic
self.processor = processor
self._table = {}
def load(self, records):
"""Load (key, value) pairs into the table."""
for r in records:
self._table[r["key"]] = r["value"]
def get(self, key):
return self._table.get(key)
class GroupedStream:
def __init__(self, topic, processor):
self.topic = topic
self.processor = processor
def aggregate(self, initializer, adder):
"""Aggregate by key."""
state = {}
return AggregatedStream(self.topic, self.processor, state, initializer, adder)
class AggregatedStream:
def __init__(self, topic, processor, state, initializer, adder):
self.topic = topic
self.processor = processor
self.state = state
self.initializer = initializer
self.adder = adder
def process(self, records):
for r in records:
key = r["key"]
self.state.setdefault(key, self.initializer())
self.state[key] = self.adder(self.state[key], r["value"])
return [{"key": k, "value": v} for k, v in self.state.items()]
# Build a Kafka Streams topology: filter -> map -> count by key
def build_clickstream_pipeline():
proc = KafkaStreamsProcessor("clickstream_analytics")
# Define topology
stream = (proc.kstream("raw_clicks")
.filter(lambda r: r.get("value", {}).get("page") is not None)
.map_values(lambda v: {"page": v["page"], "category": v["page"].split("/")[1]}
if "/" in v.get("page", "") else v))
return proc, stream
# Run pipeline
proc, stream = build_clickstream_pipeline()
clicks = [
{"key": "u1", "value": {"page": "/python/tutorials"}},
{"key": "u2", "value": {"page": "/java/guide"}},
{"key": "u3", "value": {}}, # No page - will be filtered
{"key": "u1", "value": {"page": "/python/basics"}},
]
result = stream.process(clicks)
print("Filtered & transformed stream:")
for r in result:
print(f" {r['key']}: {r['value']}")
print("\nCount by category:")
grouped = proc.kstream("raw_clicks").group_by_key()
aggregator = grouped.aggregate(lambda: {"total": 0},
lambda agg, v: {"total": agg["total"] + 1})
counts = aggregator.process(clicks)
for c in counts:
print(f" {c['key']}: {c['value']}")Expected output:
Filtered & transformed stream:
u1: {'page': '/python/tutorials', 'category': 'python'}
u2: {'page': '/java/guide', 'category': 'java'}
u1: {'page': '/python/basics', 'category': 'python'}
Count by category:
u1: {'total': 2}
u2: {'total': 1}KTable — Stream-Table Joins
Join a stream of orders with a customer table for enrichment:
def stream_table_join():
proc = KafkaStreamsProcessor("order_enrichment")
# Customer table (compacted topic)
customer_table = proc.ktable("customers")
customer_table.load([
{"key": "C001", "value": {"name": "Alice", "tier": "gold"}},
{"key": "C002", "value": {"name": "Bob", "tier": "silver"}},
])
# Order stream
order_stream = proc.kstream("orders")
orders = [
{"key": "ORD-001", "value": {"customer_id": "C001", "amount": 250}},
{"key": "ORD-002", "value": {"customer_id": "C002", "amount": 50}},
]
print("Enriched orders:")
for order in orders:
customer = customer_table.get(order["value"]["customer_id"])
enriched = {**order["value"], "customer_name": customer["name"],
"tier": customer["tier"]}
print(f" {order['key']}: {enriched}")
stream_table_join()Expected output:
Enriched orders:
ORD-001: {'customer_id': 'C001', 'amount': 250, 'customer_name': 'Alice', 'tier': 'gold'}
ORD-002: {'customer_id': 'C002', 'amount': 50, 'customer_name': 'Bob', 'tier': 'silver'}KSQL — Streaming SQL for Kafka
Apache Kafka’s KSQL provides a SQL interface for stream processing, letting you express streaming operations as SQL statements without writing Java or Python.
-- Create a stream from a Kafka topic
CREATE STREAM clickstream (
user_id VARCHAR,
page VARCHAR,
event_time BIGINT
) WITH (
KAFKA_TOPIC = 'raw_clicks',
VALUE_FORMAT = 'JSON'
);
-- Continuous filtering query
CREATE STREAM mobile_clicks AS
SELECT user_id, page, event_time
FROM clickstream
WHERE page LIKE '/mobile/%'
EMIT CHANGES;
-- Windowed aggregation
CREATE TABLE hourly_page_views AS
SELECT page,
COUNT(*) AS views,
WINDOWSTART AS window_start
FROM clickstream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY page
EMIT CHANGES;# Simulating KSQL query execution
class KSQLQueryEngine:
"""Simulate KSQL query execution."""
def __init__(self):
self.streams = {}
self.tables = {}
def create_stream(self, name, topic, schema):
self.streams[name] = {
"topic": topic,
"schema": schema,
"events": [],
"derived": [],
}
print(f"[KSQL] Created stream {name} on topic {topic}")
def ingest(self, stream_name, records):
"""Insert records into a stream."""
if stream_name in self.streams:
self.streams[stream_name]["events"].extend(records)
def create_derived_stream(self, name, source, select_fields, where_clause=None):
"""Create a derived stream (like CREATE STREAM AS SELECT)."""
source_stream = self.streams[source]
filtered = []
for event in source_stream["events"]:
if where_clause and not where_clause(event):
continue
derived = {f: event.get(f) for f in select_fields}
filtered.append(derived)
self.streams[name] = {
"topic": name.lower(),
"schema": {f: "unknown" for f in select_fields},
"events": filtered,
"derived": [source],
}
print(f"[KSQL] Created derived stream {name}: {len(filtered)} rows")
return filtered
def create_tumbling_table(self, name, source, group_by, agg_fn, window_seconds=3600):
"""Simulate CREATE TABLE AS SELECT with tumbling window."""
source_stream = self.streams[source]
windows = {}
for event in source_stream["events"]:
key = event.get(group_by)
if key:
windows.setdefault(key, [])
windows[key].append(event)
results = []
for key, events in windows.items():
result = {"key": key, "window_seconds": window_seconds}
result.update(agg_fn(events))
results.append(result)
self.tables[name] = {
"source": source,
"group_by": group_by,
"results": results,
}
print(f"[KSQL] Created table {name}: {len(results)} groups")
return results
# Run KSQL simulation
ksql = KSQLQueryEngine()
ksql.create_stream("clickstream", "raw_clicks",
{"user_id": "VARCHAR", "page": "VARCHAR", "event_time": "BIGINT"})
ksql.ingest("clickstream", [
{"user_id": "u1", "page": "/home", "event_time": 1000},
{"user_id": "u2", "page": "/mobile/offer", "event_time": 2000},
{"user_id": "u3", "page": "/mobile/checkout", "event_time": 3000},
{"user_id": "u1", "page": "/about", "event_time": 4000},
])
ksql.create_derived_stream("mobile_clicks", "clickstream",
["user_id", "page"],
where_clause=lambda e: "/mobile/" in e.get("page", ""))
ksql.create_tumbling_table("hourly_views", "clickstream", "page",
lambda ev: {"count": len(ev)})Expected output:
[KSQL] Created stream clickstream on topic raw_clicks
[KSQL] Created derived stream mobile_clicks: 2 rows
[KSQL] Created table hourly_views: 4 groupsFlink SQL — Unified Stream & Batch SQL
Apache Flink SQL provides ANSI-compliant SQL with streaming semantics, supporting complex event-time processing, exactly-once state, and dynamic table materialization.
-- Flink SQL: streaming analytics
CREATE TABLE page_views (
user_id STRING,
page STRING,
view_time TIMESTAMP(3),
WATERMARK FOR view_time AS view_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'page_views',
'format' = 'json'
);
-- Windowed aggregation with event time
SELECT
page,
COUNT(*) AS views,
TUMBLE_END(view_time, INTERVAL '10' MINUTE) AS window_end
FROM page_views
GROUP BY page, TUMBLE(view_time, INTERVAL '10' MINUTE);
-- Retract stream: emit updates
SELECT user_id, COUNT(*) AS page_views
FROM page_views
GROUP BY user_id;class FlinkSQLExecutor:
"""Simulate Flink SQL execution with streaming semantics."""
def __init__(self):
self.tables = {}
self.results = []
def create_table(self, name, schema, watermark_column=None, watermark_delay=0):
self.tables[name] = {
"schema": schema,
"watermark": watermark_column,
"delay": watermark_delay,
"data": [],
"watermark_value": 0,
}
def insert_into(self, table_name, records):
table = self.tables[table_name]
table["data"].extend(records)
# Advance watermark
if table["watermark"]:
max_ts = max(r.get(table["watermark"], 0) for r in records)
table["watermark_value"] = max_ts - table["delay"]
def select_tumble(self, source, group_by, agg, window_seconds=600,
watermark_col="view_time"):
"""Simulate TUMBLE window aggregation (Flink style)."""
table = self.tables[source]
results = []
for record in table["data"]:
ts = record.get(watermark_col, 0)
if ts < table["watermark_value"]:
window_key = ts // window_seconds
results.append({
**record,
"_window": window_key,
"_window_end": (window_key + 1) * window_seconds,
})
groups = {}
for r in results:
key = (r.get(group_by), r["_window"])
groups.setdefault(key, []).append(r)
final = []
for (key, _), events in groups.items():
if agg == "count":
final.append({group_by: key, "count": len(events),
"window_end": events[0]["_window_end"]})
return final
flink = FlinkSQLExecutor()
flink.create_table("page_views",
{"user_id": "STRING", "page": "STRING", "view_time": "BIGINT"},
watermark_column="view_time", watermark_delay=5000)
flink.insert_into("page_views", [
{"user_id": "u1", "page": "/python", "view_time": 100000},
{"user_id": "u2", "page": "/java", "view_time": 150000},
{"user_id": "u1", "page": "/python", "view_time": 200000},
{"user_id": "u3", "page": "/docker", "view_time": 300000},
])
windows = flink.select_tumble("page_views", "page", "count", window_seconds=60)
print("Flink SQL tumbling window results:")
for w in sorted(windows, key=lambda x: x["count"], reverse=True):
print(f" {w['page']:<10} count={w['count']}")Expected output:
Flink SQL tumbling window results:
python count=2
java count=1
docker count=1CDC with Debezium — Change Data Capture
Debezium is an open-source platform for CDC, capturing row-level changes from databases (MySQL, PostgreSQL, MongoDB, etc.) and streaming them to Kafka.
class DebeziumCDC:
"""Simulate Debezium CDC capturing database changes."""
def __init__(self, connector_name, db_type="postgres"):
self.name = connector_name
self.db_type = db_type
self.snapshot_complete = False
self.change_log = []
def snapshot(self, tables):
"""Initial snapshot of existing data."""
print(f"[Debezium] Starting snapshot of {len(tables)} tables")
for table, rows in tables.items():
for row in rows:
event = {
"op": "r", # read (snapshot)
"source": {"connector": self.name, "table": table},
"before": None,
"after": row,
"ts_ms": 1000,
}
self.change_log.append(event)
self.snapshot_complete = True
print(f"[Debezium] Snapshot complete: {len(self.change_log)} events")
def capture_insert(self, table, row):
"""Capture an INSERT operation."""
event = {
"op": "c", # create
"source": {"connector": self.name, "table": table},
"before": None,
"after": row,
"ts_ms": self._ts(),
}
self.change_log.append(event)
print(f"[Debezium] INSERT {table}: {row}")
def capture_update(self, table, before, after):
"""Capture an UPDATE operation."""
event = {
"op": "u",
"source": {"connector": self.name, "table": table},
"before": before,
"after": after,
"ts_ms": self._ts(),
}
self.change_log.append(event)
print(f"[Debezium] UPDATE {table}: {before} -> {after}")
def capture_delete(self, table, before):
"""Capture a DELETE operation."""
event = {
"op": "d",
"source": {"connector": self.name, "table": table},
"before": before,
"after": None,
"ts_ms": self._ts(),
}
self.change_log.append(event)
print(f"[Debezium] DELETE {table}: {before}")
def _ts(self):
import time
return int(time.time() * 1000)
def stream_to_topic(self, topic_prefix="dbserver1"):
"""Convert change log to Kafka messages."""
messages = []
for event in self.change_log:
topic = f"{topic_prefix}.{event['source']['table']}"
messages.append({"topic": topic, "key": str(event.get("after", {})),
"value": event})
return messages
# Simulate CDC
debezium = DebeziumCDC("orders-connector", "postgres")
debezium.snapshot({"customers": [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
]})
debezium.capture_insert("orders", {"id": 101, "customer_id": 1, "total": 250.0})
debezium.capture_update("customers",
{"id": 1, "name": "Alice", "email": "alice@old.com"},
{"id": 1, "name": "Alice", "email": "alice@new.com"})
msgs = debezium.stream_to_topic()
print(f"\nKafka messages generated: {len(msgs)}")
for msg in msgs[:3]:
print(f" Topic: {msg['topic']}, Op: {msg['value']['op']}")Expected output:
[Debezium] Starting snapshot of 1 tables
[Debezium] Snapshot complete: 2 events
[Debezium] INSERT orders: {'id': 101, 'customer_id': 1, 'total': 250.0}
[Debezium] UPDATE customers: {'id': 1, 'name': 'Alice', 'email': 'alice@old.com'} -> {'id': 1, 'name': 'Alice', 'email': 'alice@new.com'}
Kafka messages generated: 4
Topic: dbserver1.customers, Op: r
Topic: dbserver1.orders, Op: c
Topic: dbserver1.customers, Op: uLambda vs Kappa Architecture
| Aspect | Lambda | Kappa |
|---|---|---|
| Path | Batch + Streaming | Single streaming path |
| Code | Two codebases | One codebase |
| Storage | Lake (batch) + Speed layer | Append-only log (Kafka) |
| Reconciliation | Batch corrects stream | Reprocess from topic |
| Complexity | High — maintain both paths | Low — single pipeline |
| Latency | Batch: hours, Stream: seconds | Seconds to minutes |
| Example | Batch nightly + streaming dashboard | Kafka Streams-only pipeline |
class ArchitectureComparison:
"""Simulate Lambda vs Kappa for a simple aggregation."""
def lambda_pipeline(self, raw_events, batch_interval=5):
"""Lambda: batch layer computes correct result, speed layer approximates."""
# Speed layer: approximate count (streaming)
speed_result = {}
for e in raw_events:
key = e["key"]
speed_result[key] = speed_result.get(key, 0) + 1
# Batch layer: correct count (rerun periodically)
batch_result = {}
for e in raw_events:
key = e["key"]
batch_result[key] = batch_result.get(key, 0) + 1
return {"streaming": speed_result, "batch": batch_result}
def kappa_pipeline(self, raw_events):
"""Kappa: single stream pipeline, can reprocess."""
state = {}
for e in raw_events:
key = e["key"]
state[key] = state.get(key, 0) + 1
# Reprocess from beginning for correction
reprocessed = {}
for e in raw_events:
key = e["key"]
reprocessed[key] = reprocessed.get(key, 0) + 1
return {"realtime": state, "reprocessed": reprocessed}
events = [
{"key": "page_a", "ts": 1},
{"key": "page_b", "ts": 2},
{"key": "page_a", "ts": 3},
]
comp = ArchitectureComparison()
lam = comp.lambda_pipeline(events)
kap = comp.kappa_pipeline(events)
print("Lambda:")
print(f" Speed layer: {lam['streaming']}")
print(f" Batch layer: {lam['batch']} (reconciliation)")
print("Kappa (single pipeline):")
print(f" Real-time: {kap['realtime']}")
print(f" Reprocess: {kap['reprocessed']} (identical)")Common Mistakes
1. Not Handling Schema Evolution
Data formats change over time. Use Avro or Protobuf with Schema Registry to handle backward/forward compatibility.
2. Ignoring Kafka Consumer Lag
Unchecked consumer lag indicates the pipeline can’t keep up. Monitor with Burrow or Kafka’s built-in consumer group commands.
3. No Idempotent Sinks
If your pipeline restarts, it may re-process events. Ensure sinks (DB, S3, API) are idempotent — writing the same record twice produces the same result.
4. Using Kafka Streams for Complex State
Kafka Streams is great for lightweight state (RocksDB), but for complex windowed joins or very large state, use Flink.
5. CDC Without Schema History
Debezium captures schema changes but without a schema history topic, table evolution breaks downstream consumers.
Practice Questions
What is the difference between KStream and KTable in Kafka Streams? KStream is an unbounded stream of records. KTable is a changelog keyed by record key, representing the latest state per key.
How does Debezium capture changes from a database? Debezium reads the database’s transaction log (WAL, binlog), converts row changes to structured events, and streams them to Kafka topics.
What is the key difference between Lambda and Kappa architectures? Lambda has separate batch and streaming paths (two codebases). Kappa uses a single streaming path with replay capability from the log.
What problem does watermarking solve in Flink SQL? Watermarks handle out-of-order events, telling Flink when to trigger window computations despite late-arriving data.
Challenge: Design a real-time pipeline for an e-commerce platform that needs to capture inventory changes from PostgreSQL, enrich with product data, and emit restocking alerts within 10 seconds.
Mini Project: Real-Time Pipeline Simulator
# realtime_pipeline.py
# Simulate a complete real-time pipeline with CDC, processing, and sink
class RealTimePipeline:
def __init__(self):
self.cdc = DebeziumCDC("ecommerce-connector")
self.processor = KafkaStreamsProcessor("enrichment")
self.sinks = []
def add_sink(self, name, handler):
self.sinks.append({"name": name, "handler": handler})
def run(self):
# Step 1: Snapshot existing data
self.cdc.snapshot({"products": [
{"id": "P001", "name": "Laptop", "stock": 10},
{"id": "P002", "name": "Mouse", "stock": 50},
]})
# Step 2: Capture real-time changes
self.cdc.capture_insert("orders", {"id": "ORD-101", "product_id": "P001", "qty": 1})
self.cdc.capture_update("products",
{"id": "P001", "name": "Laptop", "stock": 10},
{"id": "P001", "name": "Laptop", "stock": 9})
# Step 3: Process and sink
for msg in self.cdc.stream_to_topic():
for sink in self.sinks:
sink["handler"](msg)
pipeline = RealTimePipeline()
pipeline.add_sink("console", lambda msg: print(f"[Sink] {msg['topic']}: {msg['value']['op']}"))
pipeline.run()What’s Next
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro