Skip to content
Event-Driven Architecture — Event Sourcing, CQRS, and Pub/Sub with Kafka Examples

Event-Driven Architecture — Event Sourcing, CQRS, and Pub/Sub with Kafka Examples

DodaTech Updated Jun 15, 2026 6 min read

Event-driven architecture (EDA) is a software design pattern where components communicate by producing and consuming events — immutable records of something that happened — enabling loose coupling, auditability, and real-time processing.

Why Event-Driven Architecture Matters

Traditional request-response architectures couple services tightly. When a user places an order, the order service must call inventory, payment, shipping, and notification services. If any fails, the order fails. In EDA, the order service simply publishes an OrderPlaced event. Any interested service subscribes and reacts independently. This decoupling makes systems more resilient, scalable, and auditable. Kafka processes 2+ trillion messages daily at LinkedIn. Event sourcing enables complete audit trails — every state change is recorded forever.

Plain-Language Explanation

Imagine a busy airport. When a flight lands, it doesn’t call each passenger individually. It simply announces “Flight 123 has landed” over the PA system. Passengers (consumers) who care about that flight react — they walk to the gate, call a taxi, or let family know. If a new service opens (a taxi stand), it just listens to arrivals — no changes needed to the flight system.

The PA announcement is an event. The flight is a producer. Passengers are consumers. The PA system itself is the event bus. This is the pub/sub (publish/subscribe) pattern at the heart of EDA.


graph LR
    subgraph "Event Producers"
        OS[Order Service]
        PS[Payment Service]
        SS[Shipping Service]
    end
    OS --> |OrderPlaced| EB[Event Bus
Kafka / EventBridge] PS --> |PaymentReceived| EB SS --> |Shipped| EB EB --> |OrderPlaced| INV[Inventory] EB --> |OrderPlaced| NOTIF[Notification] EB --> |OrderPlaced| ANALYTICS[Analytics] EB --> |PaymentReceived| ORDER[Order Service] style EB fill:#e67e22,color:#fff style OS fill:#3498db,color:#fff style PS fill:#27ae60,color:#fff style SS fill:#9b59b6,color:#fff

Event Sourcing

Instead of storing the current state, store every state change as an event. The current state is derived by replaying all events.

Benefits:

  • Complete audit trail — every change is recorded
  • Temporal queries — what did the system look like at any point in time?
  • Debugging power — replay events to reproduce bugs
  • Naturally feeds event-driven systems

Tradeoff: Event store grows forever. Snapshotting is required for performance.

import json, datetime

# Event store — append-only log
event_store = []

def apply_event(event: dict):
    event["timestamp"] = datetime.datetime.utcnow().isoformat()
    event_store.append(event)

def get_account_balance(account_id: str) -> int:
    balance = 0
    for event in event_store:
        if event.get("account_id") == account_id:
            if event["type"] == "account_created":
                balance = event["initial_balance"]
            elif event["type"] == "money_deposited":
                balance += event["amount"]
            elif event["type"] == "money_withdrawn":
                balance -= event["amount"]
    return balance

# Simulate account operations
apply_event({"type": "account_created", "account_id": "ACC-001", "initial_balance": 1000})
apply_event({"type": "money_deposited", "account_id": "ACC-001", "amount": 500})
apply_event({"type": "money_withdrawn", "account_id": "ACC-001", "amount": 200})

print(f"Balance: {get_account_balance('ACC-001')}")
print(f"Event log:")
for e in event_store:
    print(f"  {e['timestamp']}: {e['type']} ${e.get('amount', e.get('initial_balance', ''))}")

Expected output:

Balance: 1300
Event log:
  2026-06-15T12:00:00: account_created $1000
  2026-06-15T12:00:00: money_deposited $500
  2026-06-15T12:00:00: money_withdrawn $200

CQRS (Command Query Responsibility Segregation)

Separate the write model (commands) from the read model (queries). Commands go to the event store. Queries read from a materialized view optimized for reads.

# Command side (write)
class OrderCommandHandler:
    def __init__(self, event_store):
        self.event_store = event_store

    def place_order(self, user_id: str, items: list, total: float):
        event = {
            "type": "order_placed",
            "order_id": f"ORD-{hash(str(items))}",
            "user_id": user_id,
            "items": items,
            "total": total,
        }
        self.event_store.append(event)
        return event["order_id"]

# Query side (read)
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db  # Materialized view

    def get_order_summary(self, user_id: str):
        return self.read_db.query("SELECT * FROM order_summary WHERE user_id = ?", user_id)

# Projection — builds the read model from events
class OrderProjection:
    def __init__(self, read_db):
        self.read_db = read_db

    def handle(self, event: dict):
        if event["type"] == "order_placed":
            self.read_db.execute(
                "INSERT INTO order_summary VALUES (?, ?, ?, ?)",
                event["order_id"], event["user_id"], event["total"], "pending"
            )

Kafka Example — Order Processing Pipeline

# producer.py — order service publishes events
from kafka import KafkaProducer
import json, time

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

order = {
    "order_id": "ORD-12345",
    "user_id": "USR-42",
    "items": [{"product": "widget", "qty": 2, "price": 9.99}],
    "total": 19.98,
    "timestamp": time.time()
}

producer.send('orders', value=order)
producer.flush()
print(f"Published order: {order['order_id']}")
# consumer.py — notification service reacts to events
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',
    group_id='notification-service'
)

print("Listening for orders...")
for message in consumer:
    order = message.value
    print(f"New order: {order['order_id']} by {order['user_id']}")
    print(f"Sending email confirmation for ${order['total']:.2f}")
    # In production: call email API

Event Bus and Pub/Sub

An event bus (Kafka, RabbitMQ, AWS EventBridge) routes events from producers to consumers. Producers don’t know who consumes their events. Consumers don’t know who produced them.

# Simple in-memory event bus
class EventBus:
    def __init__(self):
        self.subscribers = {}

    def subscribe(self, event_type: str, handler):
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)

    def publish(self, event: dict):
        event_type = event["type"]
        for handler in self.subscribers.get(event_type, []):
            handler(event)

bus = EventBus()

# Services subscribe to events they care about
bus.subscribe("order_placed", lambda e: print(f"Inventory: reserve items for {e['order_id']}"))
bus.subscribe("order_placed", lambda e: print(f"Payment: charge {e['total']} for {e['order_id']}"))
bus.subscribe("order_placed", lambda e: print(f"Notification: email confirmation for {e['order_id']}"))
bus.subscribe("payment_received", lambda e: print(f"Shipping: prepare shipment for {e['order_id']}"))

# Place an order — all subscribers react
bus.publish({"type": "order_placed", "order_id": "ORD-123", "total": 29.99})

Expected output:

Inventory: reserve items for ORD-123
Payment: charge 29.99 for ORD-123
Notification: email confirmation for ORD-123

Common Mistakes

  1. Events are too specific: An OrderPlaced event serves many consumers. Don’t create OrderPlacedForEmailOnly — keep events business-relevant.

  2. Not idempotent: Events may be delivered twice (at-least-once). Consumers must handle duplicates gracefully.

  3. Shared event schemas: All services should agree on event schemas. Use schema registry (Avro, Protobuf) to enforce compatibility.

  4. No event versioning: Events evolve. Include a version field in every event schema.

  5. Over-relying on eventual consistency: Eventual consistency is fine for most use cases, but some operations (payments) may need stronger guarantees. Design compensating actions for failures.

Practice Questions

  1. What is the difference between event sourcing and CQRS? Event sourcing stores all state changes as events (the event log is the source of truth). CQRS separates read and write models — they often work together but are independent concepts.

  2. How does event-driven architecture improve resilience? Services are decoupled. If the notification service is down, orders are still accepted. Events queue up and are processed when the service recovers.

  3. What is a projection in event sourcing? A projection reads events and builds a materialized view optimized for queries. Projections can be rebuilt by replaying all events from scratch.

  4. How do you handle schema evolution in event-driven systems? Use a schema registry (Confluent Schema Registry for Avro, or Protobuf). Events have a version field. Consumers can handle multiple versions simultaneously during migration.

  5. When should you NOT use event sourcing? When the team is small and the domain is simple CRUD (create, read, update, delete). Event sourcing adds complexity that isn’t justified for basic data management.

Mini Project

Build an event-driven order processing pipeline:

import time, uuid

class EventStore:
    def __init__(self):
        self.events = []

    def append(self, event):
        event["event_id"] = uuid.uuid4().hex[:8]
        event["timestamp"] = time.time()
        self.events.append(event)
        return event

class OrderService:
    def __init__(self, event_store: EventStore):
        self.store = event_store

    def place_order(self, user_id: str, product: str, qty: int, price: float):
        event = self.store.append({
            "type": "OrderPlaced",
            "data": {"user_id": user_id, "product": product, "qty": qty, "total": qty * price},
        })
        print(f"Order placed: {event['event_id']}")
        return event

class InventoryService:
    def handle(self, event):
        if event["type"] == "OrderPlaced":
            print(f"  Inventory: reserving {event['data']['qty']}x {event['data']['product']}")

class NotificationService:
    def handle(self, event):
        if event["type"] == "OrderPlaced":
            print(f"  Notification: email to {event['data']['user_id']}")

# Wire up the event-driven system
store = EventStore()
order_svc = OrderService(store)
inventory_svc = InventoryService()
notification_svc = NotificationService()

# Place an order — all services react via the event store
event = order_svc.place_order("USR-42", "Widget", 3, 9.99)

# Broadcast to all interested services
for service in [inventory_svc, notification_svc]:
    service.handle(event)

Expected output:

Order placed: a1b2c3d4
  Inventory: reserving 3x Widget
  Notification: email to USR-42

Cross-References

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro