Skip to content
Stream Processing Deep Dive — Event Time, Watermarks & Exactly-Once

Stream Processing Deep Dive — Event Time, Watermarks & Exactly-Once

DodaTech Updated Jun 20, 2026 10 min read

Stream processing is the continuous computation of data as it arrives, handling unbounded datasets with event-time semantics, watermark-based completeness guarantees, and exactly-once consistency — the backbone of real-time data pipelines.

What You’ll Learn

This tutorial covers the core concepts that separate production stream processing from simple message consumption: event time vs processing time, watermark strategies, exactly-once delivery guarantees, stateful vs stateless operations, and the Kappa architecture pattern.

Why It Matters

Batch processing assumes all data is available. Real-world data arrives late, out of order, and at variable velocity. Understanding stream time semantics is the difference between correct aggregations and silently wrong results. DodaTech’s Durga Antivirus Pro uses stream processing for real-time threat detection across millions of endpoints.

Real-World Use

Apache Flink powers Alibaba’s real-time search indexing with sub-second latency. Kafka Streams processes millions of events per second at LinkedIn for personalization. Stripe uses stream processing to detect fraudulent transactions in real time.

    flowchart LR
    subgraph Sources
        A[Click Events] --> C[Kafka Topic]
        B[Sensor Data] --> C
    end
    subgraph StreamProcessor[Stream Processor]
        C --> D{Watermark<br/>Manager}
        D --> E[Window 1<br/>12:00-12:05]
        D --> F[Window 2<br/>12:05-12:10]
        E --> G[Trigger<br/>on watermark]
        F --> G
    end
    subgraph Sinks
        G --> H[Real-time Dashboard]
        G --> I[Alert System]
        G --> J[Data Lake]
    end
  

Event Time vs Processing Time

These are the two fundamental time domains in stream processing.

Processing Time

Processing time is the current time of the machine executing the stream processor. It’s the simplest to implement but gives incorrect results when data arrives late.

from datetime import datetime, timedelta
import random

def processing_time_window(events, window_size_seconds=10):
    """Simple processing-time windowing (naive)."""
    current_window = []
    window_start = datetime.now()

    for event in events:
        now = datetime.now()
        if (now - window_start).total_seconds() >= window_size_seconds:
            result = sum(e["value"] for e in current_window)
            print(f"[Processing] Window {window_start.time()}: sum={result}")
            current_window = []
            window_start = now
        current_window.append(event)

    # Flush remaining
    if current_window:
        result = sum(e["value"] for e in current_window)
        print(f"[Processing] Window {window_start.time()}: sum={result}")

# Simulate events with simulated arrival times
events = [{"event_time": None, "value": random.randint(1, 10)}
          for _ in range(15)]
processing_time_window(events, window_size_seconds=2)

Expected output (approximate, time-dependent):

[Processing] Window 12:00:02: sum=24
[Processing] Window 12:00:04: sum=31

Event Time

Event time is the timestamp embedded in the data itself — when the event actually occurred. This gives correct results regardless of arrival order or delay.

from datetime import datetime, timedelta
from collections import defaultdict

def event_time_window(events, window_size_seconds=10):
    """Event-time based windowing (correct for late data)."""
    windows = defaultdict(list)

    for event in events:
        et = event["event_time"]
        # Assign to correct window based on event time
        window_key = et.timestamp() // window_size_seconds
        windows[int(window_key)].append(event)

    for key in sorted(windows.keys()):
        window_events = windows[key]
        result = sum(e["value"] for e in window_events)
        window_start = datetime.fromtimestamp(key * window_size_seconds)
        print(f"[EventTime] Window {window_start.time()}: sum={result}, "
              f"count={len(window_events)}")

# Events with actual timestamps, arriving out of order
base = datetime.now() - timedelta(seconds=30)
events = [
    {"event_time": base + timedelta(seconds=3), "value": 5},
    {"event_time": base + timedelta(seconds=7), "value": 3},
    {"event_time": base + timedelta(seconds=1), "value": 8},  # Late!
    {"event_time": base + timedelta(seconds=12), "value": 2},
    {"event_time": base + timedelta(seconds=15), "value": 6},
]
event_time_window(events, window_size_seconds=10)

Expected output:

[EventTime] Window 12:00:00: sum=16, count=3
[EventTime] Window 12:00:10: sum=8, count=2

Watermarks

A watermark is a heuristic that tells the stream processor: “I expect no more events with event time before this point.” It’s the mechanism for deciding when to trigger a window computation.

Bounded Out-of-Orderness

Apache Flink uses a configurable maxOutOfOrderness to create watermarks that trail behind the maximum observed event time.

class WatermarkGenerator:
    """Simulate watermark generation with bounded out-of-orderness."""

    def __init__(self, max_lateness_seconds=5):
        self.max_lateness = max_lateness_seconds
        self.max_event_time = 0
        self.current_watermark = 0

    def observe(self, event_time_epoch):
        """Observe an event and potentially advance the watermark."""
        self.max_event_time = max(self.max_event_time, event_time_epoch)
        # Watermark = max observed - max lateness
        new_watermark = self.max_event_time - self.max_lateness
        if new_watermark > self.current_watermark:
            self.current_watermark = new_watermark
        return self.current_watermark

    def should_trigger_window(self, window_end_time):
        """Should we fire this window?"""
        return self.current_watermark >= window_end_time

# Simulate
watermark = WatermarkGenerator(max_lateness_seconds=5)
events = [10, 12, 8, 14, 18, 25, 22, 30]
window_ends = [15, 20, 25, 30]

print(f"{'Event':<10} {'Observed Time':<15} {'Watermark':<12} {'Window 15':<10} "
      f"{'Window 20':<10}")
print("-" * 60)

for et in events:
    wm = watermark.observe(et)
    w15 = "⏳" if not watermark.should_trigger_window(15) else "🔥"
    w20 = "⏳" if not watermark.should_trigger_window(20) else "🔥"
    print(f"{et:<10} {et:<15} {wm:<12} {w15:<10} {w20:<10}")

Expected output:

Event      Observed Time   Watermark    Window 15   Window 20
------------------------------------------------------------
10         10              5            ⏳          ⏳
12         12              7            ⏳          ⏳
8          12              7            ⏳          ⏳
14         14              9            ⏳          ⏳
18         18              13           ⏳          ⏳
25         25              20           🔥          🔥
22         25              20           🔥          🔥
30         30              25           🔥          🔥

Handling Late Data

Events arriving after the watermark has passed the window end are “late.” Strategies include:

  1. Drop: Ignore late events (simplest)
  2. Side output: Route to a dead-letter queue for reprocessing
  3. Recompute: Update the window result with a retraction
class LateDataHandler:
    def __init__(self, watermark_gen, allowed_lateness=10):
        self.watermark = watermark_gen
        self.allowed_lateness = allowed_lateness
        self.late_queue = []

    def process_event(self, event):
        """Route event: normal window or late data."""
        event_time = event["event_time"]
        wm = self.watermark.observe(event_time)

        if event_time >= wm - self.allowed_lateness:
            print(f"[OnTime] Event at {event_time} processed normally")
            return "normal"
        else:
            self.late_queue.append(event)
            print(f"[Late] Event at {event_time} routed to late queue")
            return "late"

handler = LateDataHandler(WatermarkGenerator(5), allowed_lateness=10)
events_to_process = [10, 12, 20, 25, 15, 30, 8, 35]
for et in events_to_process:
    handler.process_event({"event_time": et, "value": 1})
print(f"\nLate queue size: {len(handler.late_queue)}")

Expected output:

[OnTime] Event at 10 processed normally
[OnTime] Event at 12 processed normally
[OnTime] Event at 20 processed normally
[OnTime] Event at 25 processed normally
[OnTime] Event at 15 processed normally
[OnTime] Event at 30 processed normally
[Late] Event at 8 routed to late queue
[OnTime] Event at 35 processed normally

Late queue size: 1

Exactly-Once Semantics

Delivery guarantees exist on a spectrum. Exactly-once is the strongest — each event affects the result exactly one time, even in the face of failures.

Delivery Levels

LevelBehaviorUse Case
At-most-onceEvent may be lostMonitoring (loss ok)
At-least-onceEvent processed ≥1 timeMost systems (handle duplicates)
Exactly-onceEvent processed 1 timeFinancial transactions, billing

How Exactly-Once Works

Stream processors achieve exactly-once through a combination of:

  1. Idempotent sinks — writing the same result twice produces the same state
  2. Transactional writes — writes are committed atomically with checkpoint state
  3. State snapshots — consistent checkpoints that roll back on failure
class ExactlyOnceProcessor:
    """Simulate exactly-once processing with checkpointing."""

    def __init__(self):
        self.state = {}
        self._checkpoint = {}
        self._pending = {}
        self._offset = 0

    def process(self, key, value):
        """Process an event with exactly-once semantics."""
        self._pending[key] = value
        self._offset += 1

    def checkpoint(self):
        """Atomic commit: save pending state and advance offset."""
        self._checkpoint = dict(self.state)
        self.state.update(self._pending)
        self._pending = {}
        print(f"[Checkpoint] State size: {len(self.state)}, offset: {self._offset}")

    def recover_from_failure(self):
        """On failure, restore last checkpoint."""
        self.state = dict(self._checkpoint)
        self._pending = {}
        print(f"[Recovery] Restored state from checkpoint, offset reset to {self._offset}")
        # Events after the checkpoint will be replayed from source

processor = ExactlyOnceProcessor()
processor.process("a", 10)
processor.process("b", 20)
processor.checkpoint()
processor.process("a", 15)  # Update after checkpoint
processor.process("c", 30)

print("Before recovery:", processor.state)
processor.recover_from_failure()
print("After recovery:", processor.state)

Expected output:

[Checkpoint] State size: 2, offset: 2
Before recovery: {'a': 15, 'b': 20}
[Recovery] Restored state from checkpoint, offset reset to 2
After recovery: {'a': 10, 'b': 20}

End-to-End Exactly-Once

Exactly-once must span the entire pipeline: source → processor → sink. Apache Flink achieves this through:

  • Kafka Source: Transactional offset commits
  • Checkpointing: Consistent state snapshots via Chandy-Lamport algorithm
  • Two-Phase Commit Sink: Writes committed only if checkpoint succeeds

Stateful vs Stateless Processing

Stateless Operators

Each event is processed independently. No memory of past events.

# Stateless: map, filter, flatMap
def stateless_filter(event):
    """Stateless: pure function of single event."""
    return event["value"] > 0

def stateless_map(event):
    """Stateless: transform each event independently."""
    return {"user": event["user"], "page": event["page"].lower()}

events = [
    {"user": "Alice", "page": "/Home", "value": 1},
    {"user": "Bob", "page": "/About", "value": -1},
]
print("Stateless filter:", [e for e in events if stateless_filter(e)])
print("Stateless map:", [stateless_map(e) for e in events])

Stateful Operators

Operators maintain state across events — counters, joins, windows, sessionization.

class StatefulCounter:
    """Stateful: maintain running count per key."""

    def __init__(self):
        self._counts = {}

    def process(self, key):
        self._counts[key] = self._counts.get(key, 0) + 1
        return self._counts[key]

    def snapshot(self):
        return dict(self._counts)

counter = StatefulCounter()
for key in ["a", "b", "a", "a", "c", "b"]:
    count = counter.process(key)
    print(f"Key {key}: count={count}")

Expected output:

Key a: count=1
Key b: count=1
Key a: count=2
Key a: count=3
Key c: count=1
Key b: count=2

State Backends

BackendStorageUse Case
HashMapJVM heapSmall state, fastest
RocksDBLocal disk + off-heapLarge state, spill to disk
FileSystemDFS (S3, HDFS)Very large state, slower

Kappa Architecture

Kappa architecture simplifies the Lambda architecture by using a single stream processing pipeline for both real-time and batch results.

Lambda vs Kappa

AspectLambdaKappa
PipelinesBatch + StreamSingle stream
CodeTwo codebasesOne codebase
ConsistencyReconciliation jobReprocess from topic
ComplexityHigh (two paths)Low (one path)
StorageLake + Speed layerLog (Kafka) + stream processor
class KappaPipeline:
    """Simulate Kappa: single stream pipeline with replay capability."""

    def __init__(self):
        self._event_log = []  # Kafka-like append-only log
        self._materialized_view = {}

    def ingest(self, events):
        """Append to immutable log."""
        for e in events:
            self._event_log.append(e)

    def run_pipeline(self, from_offset=0):
        """Process from any offset (replay/batch capability)."""
        state = {}
        for i in range(from_offset, len(self._event_log)):
            e = self._event_log[i]
            key = e["user"]
            state[key] = state.get(key, 0) + e["value"]
        self._materialized_view = state
        return state

    def query_realtime(self):
        """Query the latest materialized view."""
        return self._materialized_view

    def query_historical(self, on_date):
        """Reprocess up to a point in time."""
        return self.run_pipeline()

# Same pipeline for real-time and batch
kappa = KappaPipeline()
kappa.ingest([
    {"user": "alice", "value": 5, "ts": 1},
    {"user": "bob", "value": 3, "ts": 2},
])
print("Real-time view:", kappa.run_pipeline())

# Reprocess for correction/batch
kappa.ingest([
    {"user": "alice", "value": 2, "ts": 3},
    {"user": "charlie", "value": 7, "ts": 4},
])
print("After reprocess:", kappa.run_pipeline(from_offset=0))

Expected output:

Real-time view: {'alice': 5, 'bob': 3}
After reprocess: {'alice': 7, 'bob': 3, 'charlie': 7}

Common Mistakes

1. Processing-Time Windowing for Event-Time Data

If you use processing time for events that arrive late, you’ll assign them to the wrong window. Always use event time for temporal correctness.

2. Watermark Too Strict or Too Loose

Too strict (small lateness): many late events dropped. Too loose (large lateness): windows delayed, high memory usage. Tune based on your data’s actual out-of-orderness.

3. Exactly-Once Only at Processor Level

Exactly-once within Flink doesn’t automatically mean end-to-end exactly-once. The source (offset commits) and sink (two-phase commit) must also participate.

4. State Without Backups

Stateful operators without checkpointing lose all state on failure. Always configure state backend with regular checkpointing.

5. Ignoring Backpressure

When a downstream sink is slow, the stream processor builds backpressure. Monitor Kafka consumer lag and Flink’s busyTimeMs metric.

Practice Questions

  1. What is the difference between event time and processing time? Event time is when the event occurred (embedded in data). Processing time is when the system processes it.

  2. What is a watermark and why is it needed? A watermark is a heuristic that tracks the progress of event time, enabling the system to trigger window computations when the system believes all events for that window have arrived.

  3. How does exactly-once semantics work in stream processing? Through checkpointed state snapshots, idempotent sinks, and transactional writes that commit atomically with offset advancement.

  4. What is the difference between stateful and stateless stream processing? Stateless operators process each event independently. Stateful operators maintain and query state across events (counters, windows, joins).

  5. Challenge: Design a stream processing pipeline for a real-time fraud detection system that must handle late-arriving transaction data with exactly-once guarantees.

Mini Project: Streaming Pipeline Simulator

# streaming_pipeline.py
# Simulate a complete streaming pipeline with time semantics
from datetime import datetime
from collections import defaultdict
import random
import time

class StreamingPipeline:
    def __init__(self, allowed_lateness_seconds=5):
        self.watermark = WatermarkGenerator(allowed_lateness_seconds)
        self.windows = defaultdict(list)
        self.results = {}
        self.metrics = {"total": 0, "late": 0, "on_time": 0, "windows_fired": 0}

    def process(self, event):
        et = event["event_time"]
        wm = self.watermark.observe(et)
        window_key = (et // 10) * 10
        self.windows[window_key].append(event)
        self.metrics["total"] += 1

        if et >= wm - self.watermark.max_lateness:
            self.metrics["on_time"] += 1
        else:
            self.metrics["late"] += 1

        # Fire completed windows
        for wk in list(self.windows.keys()):
            if wm >= wk + 10 and wk not in self.results:
                values = [e["value"] for e in self.windows[wk]]
                avg = sum(values) / len(values) if values else 0
                self.results[wk] = {"count": len(values), "avg": round(avg, 2)}
                self.metrics["windows_fired"] += 1

    def report(self):
        print(f"\nPipeline Metrics:")
        for k, v in self.metrics.items():
            print(f"  {k}: {v}")
        print(f"  Fired windows: {sorted(self.results.keys())}")

class WatermarkGenerator:
    def __init__(self, max_lateness_seconds):
        self.max_lateness = max_lateness_seconds
        self.max_event_time = 0
        self.current_watermark = 0

    def observe(self, event_time):
        self.max_event_time = max(self.max_event_time, event_time)
        self.current_watermark = self.max_event_time - self.max_lateness
        return self.current_watermark

pipeline = StreamingPipeline(allowed_lateness_seconds=3)
now = int(time.time())
for i in range(30):
    delay = random.randint(-5, 2)  # Some events arrive late
    event = {"event_time": now + i + delay, "value": random.randint(1, 100)}
    pipeline.process(event)

pipeline.report()

Expected output (approximate):

Pipeline Metrics:
  total: 30
  late: 4
  on_time: 26
  windows_fired: 3
  Fired windows: [...]

What’s Next

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro