Skip to main content

Introduction

In Chapter 10, we discussed batch processing: running a job on a bounded dataset. Now we explore stream processing: processing unbounded data that arrives continuously. Stream: Incrementally made available over time
  • User activity events on a website
  • Sensor readings from IoT devices
  • Stock price updates
  • Log messages from servers

1. Transmitting Event Streams

An event is a small, self-contained, immutable object containing details of something that happened. Events are written to a topic or stream, and consumers read from it.

Message Brokers vs Event Logs

Traditional message broker (RabbitMQ):
  • Messages deleted after acknowledgment
  • Supports complex routing
  • Low throughput per topic
Log-based message broker (Kafka):
  • Messages retained (configurable)
  • Simple sequential reads
  • High throughput

Apache Kafka Architecture

Key concepts: Partition: Ordered, immutable sequence of records
# Kafka producer example
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send event to partition based on user_id
event = {
    'timestamp': '2024-01-15T10:30:00',
    'user_id': 12345,
    'event_type': 'page_view',
    'page': '/products'
}

# Messages with same key go to same partition (ordering guaranteed)
producer.send('user_events', key=str(event['user_id']).encode(), value=event)
Consumer groups: Multiple consumers work together

Log Compaction

Problem: Logs grow forever Solution: Keep only the latest value for each key Use case: Maintaining database state in a log
# Example: Changelog for user database
def publish_user_update(user_id, user_data):
    """Publish user update to compacted topic"""
    producer.send(
        'user_changelog',
        key=str(user_id).encode(),
        value=user_data
    )

# Consumer can rebuild entire user database from log
def rebuild_user_database():
    """Consume compacted log to build local cache"""
    consumer = KafkaConsumer('user_changelog')
    user_db = {}

    for message in consumer:
        user_id = message.key.decode()
        user_data = json.loads(message.value)
        user_db[user_id] = user_data

    return user_db

2. Databases and Streams

Key insight: Database can be viewed as a stream of changes.

Change Data Capture (CDC)

CDC: Observe changes written to a database and replicate them to other systems. Implementation approaches: Example: Debezium (CDC tool)

Event Sourcing

Event sourcing: Store all changes as immutable events, derive current state. Benefits: Example:
# Event sourcing for shopping cart
class ShoppingCartEvent:
    pass

class ItemAdded(ShoppingCartEvent):
    def __init__(self, item_id, quantity):
        self.item_id = item_id
        self.quantity = quantity
        self.timestamp = datetime.now()

class ItemRemoved(ShoppingCartEvent):
    def __init__(self, item_id):
        self.item_id = item_id
        self.timestamp = datetime.now()

class ShoppingCart:
    def __init__(self, events=[]):
        self.items = {}
        # Rebuild state from events
        for event in events:
            self.apply(event)

    def apply(self, event):
        """Apply event to current state"""
        if isinstance(event, ItemAdded):
            if event.item_id in self.items:
                self.items[event.item_id] += event.quantity
            else:
                self.items[event.item_id] = event.quantity
        elif isinstance(event, ItemRemoved):
            self.items.pop(event.item_id, None)

    def add_item(self, item_id, quantity):
        """Add item by creating event"""
        event = ItemAdded(item_id, quantity)
        event_store.append(event)  # Persist event
        self.apply(event)           # Update local state

# Rebuild cart from event history
events = event_store.get_events_for_cart(cart_id)
cart = ShoppingCart(events)

# Time travel: State at specific time
events_until = [e for e in events if e.timestamp < specific_time]
past_cart = ShoppingCart(events_until)

3. Processing Streams

Three main types of stream processing:

Complex Event Processing

Goal: Search for patterns in event streams Example pattern queries:
-- CEP query language (example: Esper)
SELECT *
FROM LoginEvent.win:time(5 min) AS login,
     PurchaseEvent.win:time(5 min) AS purchase
WHERE login.user_id = purchase.user_id
  AND login.country != purchase.country
  AND purchase.timestamp - login.timestamp < 5 minutes
# Python implementation
from collections import deque
import time

class FraudDetector:
    def __init__(self):
        self.recent_events = {}  # user_id -> deque of events

    def process_event(self, event):
        user_id = event['user_id']

        if user_id not in self.recent_events:
            self.recent_events[user_id] = deque()

        events = self.recent_events[user_id]

        # Add current event
        events.append(event)

        # Remove events older than 5 minutes
        cutoff = time.time() - 300
        while events and events[0]['timestamp'] < cutoff:
            events.popleft()

        # Check for suspicious pattern
        if self._is_suspicious(events):
            self.raise_alert(user_id, events)

    def _is_suspicious(self, events):
        """Check if events match fraud pattern"""
        if len(events) < 2:
            return False

        # Find login and purchase events
        logins = [e for e in events if e['type'] == 'login']
        purchases = [e for e in events if e['type'] == 'purchase']

        for login in logins:
            for purchase in purchases:
                time_diff = purchase['timestamp'] - login['timestamp']
                if (0 < time_diff < 300 and
                    login['country'] != purchase['country']):
                    return True

        return False

Stream Analytics

Aggregations over time windows Tumbling window: Hopping window: Stream analytics example:
# Apache Flink example: Count page views per minute
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

# Define source (Kafka)
table_env.execute_sql("""
    CREATE TABLE page_views (
        user_id BIGINT,
        page STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'page_views',
        'properties.bootstrap.servers' = 'localhost:9092'
    )
""")

# Tumbling window aggregation
table_env.execute_sql("""
    SELECT
        page,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
        COUNT(*) as view_count
    FROM page_views
    GROUP BY
        page,
        TUMBLE(event_time, INTERVAL '1' MINUTE)
""")

Time in Stream Processing

Challenge: Events may arrive out of order Two notions of time: Watermarks: Indicate progress in event time Handling late events:
# Flink with allowed lateness
table_env.execute_sql("""
    SELECT
        page,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
        COUNT(*) as view_count
    FROM page_views
    GROUP BY
        page,
        TUMBLE(event_time, INTERVAL '1' MINUTE)
    -- Allow events up to 1 minute late
    EMIT AFTER WATERMARK
    ALLOWED LATENESS = INTERVAL '1' MINUTE
""")

4. Stream Joins

Joining streams is more complex than joining tables.

Stream-Stream Join (Window Join)

Example: Implementation:
# Flink stream-stream join
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.time import Time

env = StreamExecutionEnvironment.get_execution_environment()

impressions = env.add_source(...)  # Kafka source
clicks = env.add_source(...)        # Kafka source

# Join within 10-minute window
joined = impressions \
    .key_by(lambda imp: imp['ad_id']) \
    .interval_join(clicks.key_by(lambda click: click['ad_id'])) \
    .between(Time.minutes(-10), Time.minutes(10)) \
    .process(lambda imp, click: {
        'ad_id': imp['ad_id'],
        'user_id': imp['user_id'],
        'impression_time': imp['time'],
        'click_time': click['time'],
        'time_to_click': click['time'] - imp['time']
    })

Stream-Table Join

Join stream events with database table (enrichment). Challenge: Database table changes over time Solutions:

Table-Table Join

Both inputs are changelogs (CDC streams from databases). Example:

5. Fault Tolerance

Stream processors must handle failures gracefully.

Microbatching

Approach: Break stream into small batches (Spark Streaming) Advantages:
  • Can use batch processing techniques
  • Easier fault tolerance (batch atomicity)
Disadvantages:
  • Higher latency (wait for batch)
  • Not true streaming

Checkpointing

Approach: Periodically save complete state (Flink) Flink checkpointing: Exactly-once semantics: Implementing exactly-once:
# Example: Idempotent writes
class IdempotentWriter:
    def __init__(self):
        self.written_ids = set()  # Track written messages

    def write(self, message_id, data):
        """Write only if not already written"""
        if message_id in self.written_ids:
            # Already written, skip (duplicate)
            return False

        # Write to database/storage
        database.write(data)

        # Track that we wrote it
        self.written_ids.add(message_id)
        return True

# Flink two-phase commit for exactly-once
# 1. Pre-commit: Write to temporary location
# 2. Checkpoint barrier arrives
# 3. Commit: Atomically move temp -> final

Idempotence

Idempotent operation: Can be performed multiple times with same effect as once Making operations idempotent:
# Non-idempotent
def process_order(order_id):
    inventory = get_inventory(product_id)
    update_inventory(product_id, inventory - 1)  # Decrement

# Idempotent version
def process_order_idempotent(order_id):
    # Check if already processed
    if order_processed(order_id):
        return  # Skip duplicate

    inventory = get_inventory(product_id)
    update_inventory(product_id, inventory - 1)

    # Mark as processed
    mark_order_processed(order_id)

6. Stream Processing Frameworks Comparison

Performance comparison:

Summary

Key Takeaways:
  1. Event logs are fundamental:
    • Durable, ordered, partitioned
    • Kafka pioneered log-based messaging
    • Enable replay and multiple consumers
  2. Time is complex in streams:
    • Event time vs processing time
    • Watermarks indicate progress
    • Late events require special handling
  3. Windowing enables aggregations:
    • Tumbling: fixed, non-overlapping
    • Hopping: fixed, overlapping
    • Sliding: continuous
    • Session: based on inactivity
  4. Joins are more complex than batch:
    • Stream-stream: within time window
    • Stream-table: lookup enrichment
    • Table-table: maintain materialized view
  5. Fault tolerance is critical:
    • Exactly-once semantics ideal
    • Checkpointing saves state
    • Idempotence simplifies recovery
  6. Different frameworks, different trade-offs:
    • Flink: True streaming, low latency
    • Spark: Unified batch/stream, microbatching
    • Kafka Streams: Simple, Kafka-native
Comparison table:
AspectBatch ProcessingStream Processing
InputBounded (complete dataset)Unbounded (continuous)
LatencyMinutes to hoursMilliseconds to seconds
ResultsComplete, finalContinuous, approximate
StateMaterialized to diskIn-memory with checkpoints
TimeProcessing time onlyEvent time + processing time
FailuresRetry entire jobCheckpoint and replay
Use casesDaily reports, ML trainingFraud detection, monitoring

Previous: Chapter 10: Batch Processing

Build docs developers (and LLMs) love