Apache Storm — Real-Time Stream Processing with Topologies
Apache Storm is a distributed real-time computation system that processes unbounded streams of data using a topology of spouts and bolts, offering at-least-once and exactly-once (via Trident) processing guarantees with sub-second latency.
What You’ll Learn
In this tutorial, you’ll master Apache Storm’s architecture: topologies, spouts, bolts, stream groupings, Trident for exactly-once semantics, reliability through acking, and how Storm compares with modern stream processors like Apache Flink and Spark Streaming.
Why It Matters
Storm pioneered real-time stream processing at Twitter, processing over 1 billion events daily. While newer systems have emerged, Storm’s topology model, fine-grained reliability, and sub-second latency make it still relevant for low-latency use cases like real-time analytics, fraud detection, and network monitoring.
Real-World Use
Twitter uses Storm for real-time search indexing and trending topics. Yahoo! runs Storm for real-time content personalization. The Weather Company processes 60TB/hour of weather data through Storm topologies for hyperlocal forecasting.
flowchart TD
subgraph Topology[Storm Topology]
A[Kafka Spout] --> B[Parse Bolt]
B --> C[Filter Bolt]
C --> D[Aggregate Bolt]
D --> E[Report Bolt]
B --> F[Count Bolt]
end
subgraph Cluster[Storm Cluster]
G[Nimbus<br/>Master]
H[Supervisor]
I[Supervisor]
J[ZooKeeper]
end
A -->|"Source"| G
G -->|"Assign"| H
G -->|"Assign"| I
Storm Topology Model
A topology is a directed acyclic graph of spouts and bolts connected by streams.
Spouts
Spouts are sources of data streams. They read from external systems (Kafka, Kinesis, file tail, etc.) and emit tuples into the topology.
# Simulating a Storm spout in Python
class KafkaSpout:
"""A simulated Kafka spout that reads from a topic."""
def __init__(self, topic):
self.topic = topic
self._messages = []
self._pending = {}
def add_messages(self, messages):
"""Simulate messages arriving from Kafka."""
for msg in messages:
self._messages.append(msg)
def next_tuple(self):
"""Emit the next tuple (called by Storm runtime)."""
if self._messages:
msg = self._messages.pop(0)
tuple_id = f"{self.topic}_{hash(str(msg))}"
self._pending[tuple_id] = msg
return {"id": tuple_id, "values": [msg["key"], msg["value"]]}
return None
def ack(self, tuple_id):
"""Acknowledge successful processing."""
if tuple_id in self._pending:
del self._pending[tuple_id]
def fail(self, tuple_id):
"""Replay on failure."""
if tuple_id in self._pending:
msg = self._pending[tuple_id]
self._messages.insert(0, msg)
print(f"[Spout] Replaying: {tuple_id}")
# Simulate
spout = KafkaSpout("clickstream")
spout.add_messages([
{"key": "user_1", "value": "/home"},
{"key": "user_2", "value": "/about"},
])
for _ in range(3):
t = spout.next_tuple()
if t:
print(f"Emitted: {t['values']}")
spout.fail(t["id"]) # Simulate failureExpected output:
Emitted: ['user_1', '/home']
[Spout] Replaying: clickstream_-1295...
Emitted: ['user_1', '/home']
[Spout] Replaying: clickstream_-1295...
Emitted: ['user_1', '/home']Bolts
Bolts process tuples and emit new ones. They can filter, transform, aggregate, join, or talk to external systems.
class FilterBolt:
"""A bolt that filters based on a condition."""
def __init__(self, field, threshold):
self.field = field
self.threshold = threshold
self.tuples_processed = 0
def execute(self, tuple_data):
"""Process a single tuple."""
self.tuples_processed += 1
value = tuple_data["values"][self.field]
if value > self.threshold:
return {"values": tuple_data["values"], "action": "forward"}
return {"values": None, "action": "filtered"}
def stats(self):
return {"processed": self.tuples_processed}
class CountBolt:
"""A bolt that counts occurrences per key."""
def __init__(self):
self._counts = {}
def execute(self, tuple_data):
key = tuple_data["values"][0]
self._counts[key] = self._counts.get(key, 0) + 1
return {"values": [key, self._counts[key]], "action": "emit"}
def stats(self):
return dict(self._counts)
# Simulate topology execution
filter_bolt = FilterBolt(1, 50)
count_bolt = CountBolt()
tuples = [
{"values": ["page_a", 30]},
{"values": ["page_b", 75]},
{"values": ["page_c", 100]},
{"values": ["page_a", 20]},
{"values": ["page_b", 80]},
]
for t in tuples:
result = filter_bolt.execute(t)
if result["action"] == "forward":
count_result = count_bolt.execute(t)
print(f"Count: {count_result['values']}")
else:
print(f"Filtered: {t['values']}")Expected output:
Filtered: ['page_a', 30]
Count: ['page_b', 75]
Count: ['page_c', 100]
Filtered: ['page_a', 20]
Count: ['page_b', 80]Stream Groupings
Groupings define how tuples are routed between bolts.
| Grouping | Behavior | Use Case |
|---|---|---|
| Shuffle | Random distribution | Load balancing |
| Fields | Hash on field value | Count by key |
| All | Broadcast to all | Configuration updates |
| Global | All to task 0 | Ordered processing |
| Local/Shuffle | Shuffle within worker | Reduce network I/O |
class StreamGrouping:
@staticmethod
def fields_grouping(tuple_data, grouping_field_idx=0, num_tasks=4):
"""Route tuple based on hash of a field."""
field_value = tuple_data["values"][grouping_field_idx]
task_id = hash(field_value) % num_tasks
return task_id
@staticmethod
def shuffle_grouping(tuple_data, num_tasks=4):
"""Random distribution."""
import random
return random.randint(0, num_tasks - 1)
# Demonstrate fields grouping
print("Fields grouping by user_id:")
test_tuples = [
{"values": ["alice", "click"]},
{"values": ["bob", "click"]},
{"values": ["alice", "view"]},
{"values": ["charlie", "click"]},
]
for t in test_tuples:
task = StreamGrouping.fields_grouping(t, 0, 4)
print(f" {t['values'][0]:<10} -> Task {task}")
print("\nShuffle grouping:")
for t in test_tuples:
task = StreamGrouping.shuffle_grouping(t, 4)
print(f" {t['values'][0]:<10} -> Task {task}")Expected output:
Fields grouping by user_id:
alice -> Task 2
bob -> Task 0
alice -> Task 2
charlie -> Task 2
Shuffle grouping:
alice -> Task 3
bob -> Task 1
alice -> Task 2
charlie -> Task 3Reliability: The Acker Mechanism
Storm guarantees at-least-once processing through a distributed acker system. Each tuple in the topology has a 64-bit checksum. When the acker tree is fully acknowledged, the spout marks the tuple as processed. If any bolt fails or times out, the tuple is replayed.
class Acker:
"""Simulate Storm's acker algorithm."""
def __init__(self):
self._trees = {} # root_id -> set of tuple_ids
def root_tuple(self, root_id):
"""Register a root tuple."""
self._trees[root_id] = set()
def ack_tuple(self, root_id, tuple_id):
"""Acknowledge a tuple in the tree."""
if root_id in self._trees:
self._trees[root_id].add(tuple_id)
def is_complete(self, root_id):
"""Check if all tuples in the tree are acked."""
return root_id in self._trees and len(self._trees[root_id]) > 0
def fail_tuple(self, root_id):
"""On timeout or explicit fail."""
if root_id in self._trees:
del self._trees[root_id]
print(f"[Acker] Root {root_id} FAILED — will replay")
acker = Acker()
acker.root_tuple("root_1")
acker.ack_tuple("root_1", "bolt_a_1")
acker.ack_tuple("root_1", "bolt_b_1")
print(f"Complete: {acker.is_complete('root_1')}")
acker.fail_tuple("root_1")
print(f"After fail: {'root_1' in acker._trees}")Expected output:
Complete: True
[Acker] Root root_1 FAILED — will replay
After fail: FalseTrident — Exactly-Once Semantics
Trident is a high-level abstraction over Storm that provides exactly-once processing through micro-batching, stateful processing, and transactional state updates.
class TridentTopology:
"""Simulate Trident's micro-batch and exactly-once processing."""
def __init__(self, batch_size=3):
self.batch_size = batch_size
self._batches = []
self._state = {}
def add_stream(self, tuples):
"""Split stream into micro-batches."""
for i in range(0, len(tuples), self.batch_size):
batch = tuples[i:i + self.batch_size]
self._batches.append(batch)
def process_batches(self):
"""Process each batch transactionally."""
for batch_num, batch in enumerate(self._batches):
batch_result = {}
for t in batch:
key = t[0]
batch_result[key] = batch_result.get(key, 0) + t[1]
# Transactional state update (exactly-once)
for key, value in batch_result.items():
old_value = self._state.get(key, 0)
self._state[key] = old_value + value
print(f"[Trident] Batch {batch_num}: {batch_result}, "
f"state: {dict(self._state)}")
def query(self, key):
return self._state.get(key, 0)
# Simulate Trident processing
trident = TridentTopology(batch_size=2)
trident.add_stream([
("page_a", 10), ("page_b", 20),
("page_a", 5), ("page_c", 15),
("page_b", 10), ("page_b", 10),
])
trident.process_batches()
print(f"\nFinal page_a count: {trident.query('page_a')}")Expected output:
[Trident] Batch 0: {'page_a': 10, 'page_b': 20}, state: {'page_a': 10, 'page_b': 20}
[Trident] Batch 1: {'page_a': 5, 'page_c': 15}, state: {'page_a': 15, 'page_b': 20, 'page_c': 15}
[Trident] Batch 2: {'page_b': 20}, state: {'page_a': 15, 'page_b': 40, 'page_c': 15}
Final page_a count: 15Storm vs Flink vs Spark Streaming
flowchart LR
subgraph Storm[Apache Storm]
A1[Topology<br/>Spouts + Bolts]
A2[At-least-once<br/>Trident: exactly-once]
A3[~100ms latency]
end
subgraph Flink[Apache Flink]
B1[Dataflow<br/>Operators]
B2[Exactly-once<br/>Checkpoints]
B3[~10ms latency]
end
subgraph Spark[Spark Streaming]
C1[Micro-batches<br/>DStreams]
C2[Exactly-once<br/>WAL]
C3[~1s latency]
end
| Feature | Storm | Flink | Spark Streaming |
|---|---|---|---|
| Model | Topology (DAG) | Dataflow graph | Micro-batch (DStream) |
| Latency | Sub-second (~100ms) | Real-time (~10ms) | Seconds (~1s+) |
| Processing | Per-tuple | Per-event | Micro-batch |
| Exactly-once | Trident | Native | Structured Streaming |
| State | External or Trident | Managed keyed state | Managed state |
| Backpressure | Manual via config | Auto | Auto (rate control) |
| Windowing | Manual | Built-in (event/processing) | Built-in |
| Ecosystem | Mature, many connectors | Growing, broad connectors | Rich (Spark ML/Graph) |
class StreamingComparison:
"""Simulate latency and throughput characteristics."""
def __init__(self, name, latency_ms, throughput_per_sec):
self.name = name
self.latency_ms = latency_ms
self.throughput = throughput_per_sec
def simulate(self, events):
"""Simulate processing with latency model."""
import time
start = time.time()
processed = 0
for event in events:
time.sleep(self.latency_ms / 1000)
processed += 1
if processed >= self.throughput:
break
elapsed = time.time() - start
return {
"engine": self.name,
"processed": processed,
"elapsed_s": round(elapsed, 3),
}
# Compare (theoretical, not actual benchmarks)
print("Theoretical throughput per second:")
comparisons = [
StreamingComparison("Storm", 100, 10000),
StreamingComparison("Flink", 10, 15000),
StreamingComparison("Spark Streaming", 1000, 50000),
]
for c in comparisons:
print(f" {c.name:<20} ~{c.throughput} events/sec @ ~{c.latency_ms}ms latency")Common Mistakes
1. Not Tuning Parallelism
Default parallelism of 1 means no distribution. Set appropriate setParallelism() on each component based on data volume.
2. Ignoring Tuple Timeouts
Default tuple timeout is 30 seconds. For high-latency bolts, increase it. For low-latency, decrease it to fail fast.
3. Heavy Processing in Spouts
Spouts should only read and emit. Heavy computation belongs in bolts. Blocking spout operations slow the entire topology.
4. Using Trident for True Real-Time
Trident batches increase latency to seconds. For sub-second processing, use core Storm with at-least-once semantics.
5. Not Handling Backpressure
When bolts can’t keep up, Storm will either drop tuples or the spout will keep emitting. Use maxSpoutPending to throttle.
Practice Questions
What is the difference between a spout and a bolt in Storm? Spouts are data sources (read from external systems). Bolts are processors (transform, filter, aggregate, or sink data).
How does Storm guarantee at-least-once processing? Through the acker mechanism. Each tuple tree has a 64-bit checksum. When all bolts ack their tuples, the spout acks the root. On failure or timeout, the spout replays.
What problem does Trident solve compared to core Storm? Trident provides exactly-once semantics, stateful processing with micro-batching, and a higher-level API at the cost of increased latency.
How does Storm’s processing model differ from Spark Streaming? Storm processes tuples individually (per-tuple) with sub-second latency. Spark Streaming processes micro-batches with higher throughput but second-level latency.
Challenge: Design a Storm topology for real-time fraud detection that reads from Kafka, enriches with user profiles, runs scoring rules, and emits alerts — with at-least-once guarantees.
Mini Project: Storm Topology Simulator
# storm_topology.py
# Simulate a complete Storm topology with spout, bolt chain, and acker
class StormTopology:
def __init__(self):
self.spout = None
self.bolts = []
self.acker = Acker()
def set_spout(self, spout):
self.spout = spout
def add_bolt(self, bolt):
self.bolts.append(bolt)
def run(self, iterations=10):
print("=== Storm Topology Run ===\n")
for i in range(iterations):
# Spout emits
t = self.spout.next_tuple()
if not t:
break
root_id = t["id"]
self.acker.root_tuple(root_id)
# Process through bolt chain
data = t
for bolt in self.bolts:
result = bolt.execute(data)
if result["values"] is None:
break # Filtered
data["values"] = result["values"]
else:
# All bolts succeeded
self.acker.ack_tuple(root_id, "all_bolts")
print(f"✅ Tuple {root_id[:20]}... processed successfully")
print(f"\nSpout pending: {len(self.spout._pending)}")
print(f"Acker trees: {len(self.acker._trees)}")
# Run topology
topology = StormTopology()
spout = KafkaSpout("events")
spout.add_messages([
{"key": "a", "value": 10},
{"key": "b", "value": 50},
{"key": "c", "value": 100},
])
topology.set_spout(spout)
topology.add_bolt(FilterBolt(1, 15))
topology.add_bolt(CountBolt())
topology.run(5)What’s Next
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro