Skip to content
Real-Time Data Pipelines — Kafka Streams, KSQL, Flink SQL & CDC

Real-Time Data Pipelines — Kafka Streams, KSQL, Flink SQL & CDC

DodaTech Updated Jun 20, 2026 12 min read

A real-time data pipeline ingests, processes, and delivers data with latencies measured in seconds or milliseconds — using technologies like Kafka Streams, KSQL, Flink SQL, and CDC (Change Data Capture) with Debezium to process events as they happen.

What You’ll Learn

This tutorial covers the complete real-time pipeline stack: building stream processors with Kafka Streams, writing streaming SQL with KSQL and Flink SQL, capturing database changes with Debezium CDC, comparing Lambda vs Kappa architectures, and production deployment patterns.

Why It Matters

Batch pipelines have hours of latency. Real-time pipelines unlock immediate action — fraud detection, personalization, monitoring, and operational analytics. DodaTech’s Durga Antivirus Pro processes threat intelligence streams in real time to block attacks within seconds of detection.

Real-World Use

Netflix uses Kafka Streams for real-time personalization of 200M+ subscriber recommendations. Uber processes 100B+ events daily through Flink for dispatch optimization. Shopify streams 1B+ events/minute through Kafka for inventory and checkout processing.

    flowchart LR
    subgraph Sources
        A[Application<br/>Events] --> D[Kafka]
        B[Database CDC<br/>Debezium] --> D
        C[Log Files] --> D
    end
    subgraph Processing
        D --> E[Kafka Streams<br/>KStream/KTable]
        D --> F[KSQL / Flink SQL]
        E --> G[State Store<br/>RocksDB]
        F --> H[Streaming<br/>Aggregations]
    end
    subgraph Sinks
        G --> I[Real-time DB<br/>Cassandra/Redis]
        H --> J[Dashboard]
        H --> K[Alert System]
        H --> L[Data Lake]
    end
  

Kafka Streams — Lightweight Stream Processing

Kafka Streams is a client library for building stream processing applications directly on top of Kafka, without requiring a separate processing cluster.

Core Abstractions

  • KStream: An unbounded stream of records (changelog)
  • KTable: A changelog keyed by record key (table-like state)
  • GlobalKTable: A fully replicated changelog for joins
# Simulating Kafka Streams concepts in Python
class KafkaStreamsProcessor:
    """Simulate a Kafka Streams topology."""

    def __init__(self, application_id):
        self.app_id = application_id
        self._state = {}
        self._stores = {}

    def kstream(self, topic):
        """Create a KStream from a topic (simulated)."""
        return KStream(topic, self)

    def ktable(self, topic):
        """Create a KTable (changelog) from a compacted topic."""
        return KTable(topic, self)

class KStream:
    def __init__(self, topic, processor):
        self.topic = topic
        self.processor = processor

    def filter(self, predicate):
        """Filter records that match a condition."""
        ops = self.processor._state.setdefault(self.topic, [])
        ops.append(("filter", predicate))
        return self

    def map_values(self, mapper):
        """Transform values."""
        ops = self.processor._state.setdefault(self.topic, [])
        ops.append(("map_values", mapper))
        return self

    def group_by_key(self):
        """Group records by their key."""
        return GroupedStream(self.topic, self.processor)

    def join(self, other_stream, joiner, window_ms=60000):
        """Stream-stream join within a window."""
        ops = self.processor._state.setdefault(f"{self.topic}_join", [])
        ops.append(("join", other_stream.topic, joiner, window_ms))
        return self

    def process(self, records):
        """Run records through the stream operations."""
        ops = self.processor._state.get(self.topic, [])
        result = []
        for record in records:
            r = dict(record)
            for op_type, *args in ops:
                if op_type == "filter":
                    if not args[0](r):
                        r = None
                        break
                elif op_type == "map_values":
                    r["value"] = args[0](r["value"])
                elif op_type == "map":
                    r = args[0](r)
            if r:
                result.append(r)
        return result

class KTable:
    def __init__(self, topic, processor):
        self.topic = topic
        self.processor = processor
        self._table = {}

    def load(self, records):
        """Load (key, value) pairs into the table."""
        for r in records:
            self._table[r["key"]] = r["value"]

    def get(self, key):
        return self._table.get(key)

class GroupedStream:
    def __init__(self, topic, processor):
        self.topic = topic
        self.processor = processor

    def aggregate(self, initializer, adder):
        """Aggregate by key."""
        state = {}
        return AggregatedStream(self.topic, self.processor, state, initializer, adder)

class AggregatedStream:
    def __init__(self, topic, processor, state, initializer, adder):
        self.topic = topic
        self.processor = processor
        self.state = state
        self.initializer = initializer
        self.adder = adder

    def process(self, records):
        for r in records:
            key = r["key"]
            self.state.setdefault(key, self.initializer())
            self.state[key] = self.adder(self.state[key], r["value"])
        return [{"key": k, "value": v} for k, v in self.state.items()]

# Build a Kafka Streams topology: filter -> map -> count by key
def build_clickstream_pipeline():
    proc = KafkaStreamsProcessor("clickstream_analytics")

    # Define topology
    stream = (proc.kstream("raw_clicks")
              .filter(lambda r: r.get("value", {}).get("page") is not None)
              .map_values(lambda v: {"page": v["page"], "category": v["page"].split("/")[1]}
                          if "/" in v.get("page", "") else v))

    return proc, stream

# Run pipeline
proc, stream = build_clickstream_pipeline()
clicks = [
    {"key": "u1", "value": {"page": "/python/tutorials"}},
    {"key": "u2", "value": {"page": "/java/guide"}},
    {"key": "u3", "value": {}},  # No page - will be filtered
    {"key": "u1", "value": {"page": "/python/basics"}},
]
result = stream.process(clicks)
print("Filtered & transformed stream:")
for r in result:
    print(f"  {r['key']}: {r['value']}")

print("\nCount by category:")
grouped = proc.kstream("raw_clicks").group_by_key()
aggregator = grouped.aggregate(lambda: {"total": 0},
                                lambda agg, v: {"total": agg["total"] + 1})
counts = aggregator.process(clicks)
for c in counts:
    print(f"  {c['key']}: {c['value']}")

Expected output:

Filtered & transformed stream:
  u1: {'page': '/python/tutorials', 'category': 'python'}
  u2: {'page': '/java/guide', 'category': 'java'}
  u1: {'page': '/python/basics', 'category': 'python'}

Count by category:
  u1: {'total': 2}
  u2: {'total': 1}

KTable — Stream-Table Joins

Join a stream of orders with a customer table for enrichment:

def stream_table_join():
    proc = KafkaStreamsProcessor("order_enrichment")

    # Customer table (compacted topic)
    customer_table = proc.ktable("customers")
    customer_table.load([
        {"key": "C001", "value": {"name": "Alice", "tier": "gold"}},
        {"key": "C002", "value": {"name": "Bob", "tier": "silver"}},
    ])

    # Order stream
    order_stream = proc.kstream("orders")
    orders = [
        {"key": "ORD-001", "value": {"customer_id": "C001", "amount": 250}},
        {"key": "ORD-002", "value": {"customer_id": "C002", "amount": 50}},
    ]

    print("Enriched orders:")
    for order in orders:
        customer = customer_table.get(order["value"]["customer_id"])
        enriched = {**order["value"], "customer_name": customer["name"],
                    "tier": customer["tier"]}
        print(f"  {order['key']}: {enriched}")

stream_table_join()

Expected output:

Enriched orders:
  ORD-001: {'customer_id': 'C001', 'amount': 250, 'customer_name': 'Alice', 'tier': 'gold'}
  ORD-002: {'customer_id': 'C002', 'amount': 50, 'customer_name': 'Bob', 'tier': 'silver'}

KSQL — Streaming SQL for Kafka

Apache Kafka’s KSQL provides a SQL interface for stream processing, letting you express streaming operations as SQL statements without writing Java or Python.

-- Create a stream from a Kafka topic
CREATE STREAM clickstream (
    user_id VARCHAR,
    page VARCHAR,
    event_time BIGINT
) WITH (
    KAFKA_TOPIC = 'raw_clicks',
    VALUE_FORMAT = 'JSON'
);

-- Continuous filtering query
CREATE STREAM mobile_clicks AS
SELECT user_id, page, event_time
FROM clickstream
WHERE page LIKE '/mobile/%'
EMIT CHANGES;

-- Windowed aggregation
CREATE TABLE hourly_page_views AS
SELECT page,
       COUNT(*) AS views,
       WINDOWSTART AS window_start
FROM clickstream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY page
EMIT CHANGES;
# Simulating KSQL query execution
class KSQLQueryEngine:
    """Simulate KSQL query execution."""

    def __init__(self):
        self.streams = {}
        self.tables = {}

    def create_stream(self, name, topic, schema):
        self.streams[name] = {
            "topic": topic,
            "schema": schema,
            "events": [],
            "derived": [],
        }
        print(f"[KSQL] Created stream {name} on topic {topic}")

    def ingest(self, stream_name, records):
        """Insert records into a stream."""
        if stream_name in self.streams:
            self.streams[stream_name]["events"].extend(records)

    def create_derived_stream(self, name, source, select_fields, where_clause=None):
        """Create a derived stream (like CREATE STREAM AS SELECT)."""
        source_stream = self.streams[source]
        filtered = []
        for event in source_stream["events"]:
            if where_clause and not where_clause(event):
                continue
            derived = {f: event.get(f) for f in select_fields}
            filtered.append(derived)

        self.streams[name] = {
            "topic": name.lower(),
            "schema": {f: "unknown" for f in select_fields},
            "events": filtered,
            "derived": [source],
        }
        print(f"[KSQL] Created derived stream {name}: {len(filtered)} rows")
        return filtered

    def create_tumbling_table(self, name, source, group_by, agg_fn, window_seconds=3600):
        """Simulate CREATE TABLE AS SELECT with tumbling window."""
        source_stream = self.streams[source]
        windows = {}
        for event in source_stream["events"]:
            key = event.get(group_by)
            if key:
                windows.setdefault(key, [])
                windows[key].append(event)

        results = []
        for key, events in windows.items():
            result = {"key": key, "window_seconds": window_seconds}
            result.update(agg_fn(events))
            results.append(result)

        self.tables[name] = {
            "source": source,
            "group_by": group_by,
            "results": results,
        }
        print(f"[KSQL] Created table {name}: {len(results)} groups")
        return results

# Run KSQL simulation
ksql = KSQLQueryEngine()
ksql.create_stream("clickstream", "raw_clicks", 
                   {"user_id": "VARCHAR", "page": "VARCHAR", "event_time": "BIGINT"})
ksql.ingest("clickstream", [
    {"user_id": "u1", "page": "/home", "event_time": 1000},
    {"user_id": "u2", "page": "/mobile/offer", "event_time": 2000},
    {"user_id": "u3", "page": "/mobile/checkout", "event_time": 3000},
    {"user_id": "u1", "page": "/about", "event_time": 4000},
])
ksql.create_derived_stream("mobile_clicks", "clickstream",
                           ["user_id", "page"],
                           where_clause=lambda e: "/mobile/" in e.get("page", ""))
ksql.create_tumbling_table("hourly_views", "clickstream", "page",
                           lambda ev: {"count": len(ev)})

Expected output:

[KSQL] Created stream clickstream on topic raw_clicks
[KSQL] Created derived stream mobile_clicks: 2 rows
[KSQL] Created table hourly_views: 4 groups

Flink SQL — Unified Stream & Batch SQL

Apache Flink SQL provides ANSI-compliant SQL with streaming semantics, supporting complex event-time processing, exactly-once state, and dynamic table materialization.

-- Flink SQL: streaming analytics
CREATE TABLE page_views (
    user_id STRING,
    page STRING,
    view_time TIMESTAMP(3),
    WATERMARK FOR view_time AS view_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'page_views',
    'format' = 'json'
);

-- Windowed aggregation with event time
SELECT
    page,
    COUNT(*) AS views,
    TUMBLE_END(view_time, INTERVAL '10' MINUTE) AS window_end
FROM page_views
GROUP BY page, TUMBLE(view_time, INTERVAL '10' MINUTE);

-- Retract stream: emit updates
SELECT user_id, COUNT(*) AS page_views
FROM page_views
GROUP BY user_id;
class FlinkSQLExecutor:
    """Simulate Flink SQL execution with streaming semantics."""

    def __init__(self):
        self.tables = {}
        self.results = []

    def create_table(self, name, schema, watermark_column=None, watermark_delay=0):
        self.tables[name] = {
            "schema": schema,
            "watermark": watermark_column,
            "delay": watermark_delay,
            "data": [],
            "watermark_value": 0,
        }

    def insert_into(self, table_name, records):
        table = self.tables[table_name]
        table["data"].extend(records)
        # Advance watermark
        if table["watermark"]:
            max_ts = max(r.get(table["watermark"], 0) for r in records)
            table["watermark_value"] = max_ts - table["delay"]

    def select_tumble(self, source, group_by, agg, window_seconds=600,
                      watermark_col="view_time"):
        """Simulate TUMBLE window aggregation (Flink style)."""
        table = self.tables[source]
        results = []
        for record in table["data"]:
            ts = record.get(watermark_col, 0)
            if ts < table["watermark_value"]:
                window_key = ts // window_seconds
                results.append({
                    **record,
                    "_window": window_key,
                    "_window_end": (window_key + 1) * window_seconds,
                })

        groups = {}
        for r in results:
            key = (r.get(group_by), r["_window"])
            groups.setdefault(key, []).append(r)

        final = []
        for (key, _), events in groups.items():
            if agg == "count":
                final.append({group_by: key, "count": len(events), 
                              "window_end": events[0]["_window_end"]})

        return final

flink = FlinkSQLExecutor()
flink.create_table("page_views", 
                   {"user_id": "STRING", "page": "STRING", "view_time": "BIGINT"},
                   watermark_column="view_time", watermark_delay=5000)

flink.insert_into("page_views", [
    {"user_id": "u1", "page": "/python", "view_time": 100000},
    {"user_id": "u2", "page": "/java", "view_time": 150000},
    {"user_id": "u1", "page": "/python", "view_time": 200000},
    {"user_id": "u3", "page": "/docker", "view_time": 300000},
])

windows = flink.select_tumble("page_views", "page", "count", window_seconds=60)
print("Flink SQL tumbling window results:")
for w in sorted(windows, key=lambda x: x["count"], reverse=True):
    print(f"  {w['page']:<10} count={w['count']}")

Expected output:

Flink SQL tumbling window results:
  python     count=2
  java       count=1
  docker     count=1

CDC with Debezium — Change Data Capture

Debezium is an open-source platform for CDC, capturing row-level changes from databases (MySQL, PostgreSQL, MongoDB, etc.) and streaming them to Kafka.

class DebeziumCDC:
    """Simulate Debezium CDC capturing database changes."""

    def __init__(self, connector_name, db_type="postgres"):
        self.name = connector_name
        self.db_type = db_type
        self.snapshot_complete = False
        self.change_log = []

    def snapshot(self, tables):
        """Initial snapshot of existing data."""
        print(f"[Debezium] Starting snapshot of {len(tables)} tables")
        for table, rows in tables.items():
            for row in rows:
                event = {
                    "op": "r",  # read (snapshot)
                    "source": {"connector": self.name, "table": table},
                    "before": None,
                    "after": row,
                    "ts_ms": 1000,
                }
                self.change_log.append(event)
        self.snapshot_complete = True
        print(f"[Debezium] Snapshot complete: {len(self.change_log)} events")

    def capture_insert(self, table, row):
        """Capture an INSERT operation."""
        event = {
            "op": "c",  # create
            "source": {"connector": self.name, "table": table},
            "before": None,
            "after": row,
            "ts_ms": self._ts(),
        }
        self.change_log.append(event)
        print(f"[Debezium] INSERT {table}: {row}")

    def capture_update(self, table, before, after):
        """Capture an UPDATE operation."""
        event = {
            "op": "u",
            "source": {"connector": self.name, "table": table},
            "before": before,
            "after": after,
            "ts_ms": self._ts(),
        }
        self.change_log.append(event)
        print(f"[Debezium] UPDATE {table}: {before} -> {after}")

    def capture_delete(self, table, before):
        """Capture a DELETE operation."""
        event = {
            "op": "d",
            "source": {"connector": self.name, "table": table},
            "before": before,
            "after": None,
            "ts_ms": self._ts(),
        }
        self.change_log.append(event)
        print(f"[Debezium] DELETE {table}: {before}")

    def _ts(self):
        import time
        return int(time.time() * 1000)

    def stream_to_topic(self, topic_prefix="dbserver1"):
        """Convert change log to Kafka messages."""
        messages = []
        for event in self.change_log:
            topic = f"{topic_prefix}.{event['source']['table']}"
            messages.append({"topic": topic, "key": str(event.get("after", {})),
                             "value": event})
        return messages

# Simulate CDC
debezium = DebeziumCDC("orders-connector", "postgres")
debezium.snapshot({"customers": [
    {"id": 1, "name": "Alice", "email": "alice@example.com"},
    {"id": 2, "name": "Bob", "email": "bob@example.com"},
]})
debezium.capture_insert("orders", {"id": 101, "customer_id": 1, "total": 250.0})
debezium.capture_update("customers", 
                        {"id": 1, "name": "Alice", "email": "alice@old.com"},
                        {"id": 1, "name": "Alice", "email": "alice@new.com"})

msgs = debezium.stream_to_topic()
print(f"\nKafka messages generated: {len(msgs)}")
for msg in msgs[:3]:
    print(f"  Topic: {msg['topic']}, Op: {msg['value']['op']}")

Expected output:

[Debezium] Starting snapshot of 1 tables
[Debezium] Snapshot complete: 2 events
[Debezium] INSERT orders: {'id': 101, 'customer_id': 1, 'total': 250.0}
[Debezium] UPDATE customers: {'id': 1, 'name': 'Alice', 'email': 'alice@old.com'} -> {'id': 1, 'name': 'Alice', 'email': 'alice@new.com'}

Kafka messages generated: 4
  Topic: dbserver1.customers, Op: r
  Topic: dbserver1.orders, Op: c
  Topic: dbserver1.customers, Op: u

Lambda vs Kappa Architecture

AspectLambdaKappa
PathBatch + StreamingSingle streaming path
CodeTwo codebasesOne codebase
StorageLake (batch) + Speed layerAppend-only log (Kafka)
ReconciliationBatch corrects streamReprocess from topic
ComplexityHigh — maintain both pathsLow — single pipeline
LatencyBatch: hours, Stream: secondsSeconds to minutes
ExampleBatch nightly + streaming dashboardKafka Streams-only pipeline
class ArchitectureComparison:
    """Simulate Lambda vs Kappa for a simple aggregation."""

    def lambda_pipeline(self, raw_events, batch_interval=5):
        """Lambda: batch layer computes correct result, speed layer approximates."""
        # Speed layer: approximate count (streaming)
        speed_result = {}
        for e in raw_events:
            key = e["key"]
            speed_result[key] = speed_result.get(key, 0) + 1

        # Batch layer: correct count (rerun periodically)
        batch_result = {}
        for e in raw_events:
            key = e["key"]
            batch_result[key] = batch_result.get(key, 0) + 1

        return {"streaming": speed_result, "batch": batch_result}

    def kappa_pipeline(self, raw_events):
        """Kappa: single stream pipeline, can reprocess."""
        state = {}
        for e in raw_events:
            key = e["key"]
            state[key] = state.get(key, 0) + 1

        # Reprocess from beginning for correction
        reprocessed = {}
        for e in raw_events:
            key = e["key"]
            reprocessed[key] = reprocessed.get(key, 0) + 1

        return {"realtime": state, "reprocessed": reprocessed}

events = [
    {"key": "page_a", "ts": 1},
    {"key": "page_b", "ts": 2},
    {"key": "page_a", "ts": 3},
]
comp = ArchitectureComparison()
lam = comp.lambda_pipeline(events)
kap = comp.kappa_pipeline(events)
print("Lambda:")
print(f"  Speed layer: {lam['streaming']}")
print(f"  Batch layer: {lam['batch']} (reconciliation)")
print("Kappa (single pipeline):")
print(f"  Real-time: {kap['realtime']}")
print(f"  Reprocess: {kap['reprocessed']} (identical)")

Common Mistakes

1. Not Handling Schema Evolution

Data formats change over time. Use Avro or Protobuf with Schema Registry to handle backward/forward compatibility.

2. Ignoring Kafka Consumer Lag

Unchecked consumer lag indicates the pipeline can’t keep up. Monitor with Burrow or Kafka’s built-in consumer group commands.

3. No Idempotent Sinks

If your pipeline restarts, it may re-process events. Ensure sinks (DB, S3, API) are idempotent — writing the same record twice produces the same result.

4. Using Kafka Streams for Complex State

Kafka Streams is great for lightweight state (RocksDB), but for complex windowed joins or very large state, use Flink.

5. CDC Without Schema History

Debezium captures schema changes but without a schema history topic, table evolution breaks downstream consumers.

Practice Questions

  1. What is the difference between KStream and KTable in Kafka Streams? KStream is an unbounded stream of records. KTable is a changelog keyed by record key, representing the latest state per key.

  2. How does Debezium capture changes from a database? Debezium reads the database’s transaction log (WAL, binlog), converts row changes to structured events, and streams them to Kafka topics.

  3. What is the key difference between Lambda and Kappa architectures? Lambda has separate batch and streaming paths (two codebases). Kappa uses a single streaming path with replay capability from the log.

  4. What problem does watermarking solve in Flink SQL? Watermarks handle out-of-order events, telling Flink when to trigger window computations despite late-arriving data.

  5. Challenge: Design a real-time pipeline for an e-commerce platform that needs to capture inventory changes from PostgreSQL, enrich with product data, and emit restocking alerts within 10 seconds.

Mini Project: Real-Time Pipeline Simulator

# realtime_pipeline.py
# Simulate a complete real-time pipeline with CDC, processing, and sink

class RealTimePipeline:
    def __init__(self):
        self.cdc = DebeziumCDC("ecommerce-connector")
        self.processor = KafkaStreamsProcessor("enrichment")
        self.sinks = []

    def add_sink(self, name, handler):
        self.sinks.append({"name": name, "handler": handler})

    def run(self):
        # Step 1: Snapshot existing data
        self.cdc.snapshot({"products": [
            {"id": "P001", "name": "Laptop", "stock": 10},
            {"id": "P002", "name": "Mouse", "stock": 50},
        ]})

        # Step 2: Capture real-time changes
        self.cdc.capture_insert("orders", {"id": "ORD-101", "product_id": "P001", "qty": 1})
        self.cdc.capture_update("products",
                                {"id": "P001", "name": "Laptop", "stock": 10},
                                {"id": "P001", "name": "Laptop", "stock": 9})

        # Step 3: Process and sink
        for msg in self.cdc.stream_to_topic():
            for sink in self.sinks:
                sink["handler"](msg)

pipeline = RealTimePipeline()
pipeline.add_sink("console", lambda msg: print(f"[Sink] {msg['topic']}: {msg['value']['op']}"))
pipeline.run()

What’s Next

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro