Skip to main content

Introduction

There are three main types of systems:
  1. Services (online systems): Wait for request, send response, measured by response time
  2. Batch processing (offline systems): Take large input, produce output, measured by throughput
  3. Stream processing (near-real-time systems): Between batch and services, consume inputs shortly after they’re produced
This chapter focuses on batch processing: processing large amounts of data in a single job that takes minutes to days.

1. Batch Processing with Unix Tools

Let’s start with a simple task: analyze web server logs to find the top 5 most popular URLs.

Simple Log Analysis

Using Unix tools:
cat access.log |
  awk '{print $7}' |          # Extract URL (7th field)
  sort |                       # Sort URLs
  uniq -c |                    # Count unique URLs
  sort -rn |                   # Sort by count (descending)
  head -n 5                    # Take top 5
How it works:

The Unix Philosophy

Uniform interface: Everything is a file (or stream of bytes)
# Python equivalent of Unix pipeline
import sys
from collections import Counter

def extract_url(line):
    """Extract URL from log line"""
    parts = line.split()
    if len(parts) >= 7:
        return parts[6]
    return None

def process_log(filename):
    """Count URLs in log file"""
    url_counts = Counter()

    with open(filename, 'r') as f:
        for line in f:
            url = extract_url(line)
            if url:
                url_counts[url] += 1

    # Get top 5
    top_5 = url_counts.most_common(5)

    for url, count in top_5:
        print(f"{count} {url}")

# Usage
process_log('access.log')
But Unix tools are better for ad-hoc analysis!

Sorting vs In-Memory Aggregation

GNU sort is remarkably efficient:
  • Automatically uses disk when data exceeds memory
  • Parallelizes sorting across CPU cores
  • Can merge pre-sorted files
# Sort 100 GB file using only 1 GB RAM
sort --parallel=4 --buffer-size=1G huge_file.txt

The Unix Pipe

Pipe (|) connects output of one program to input of another: Benefits:
  • No temporary files on disk
  • Programs run in parallel
  • Backpressure: slower program controls rate

2. MapReduce and Distributed Filesystems

Unix tools work great on a single machine, but what about datasets that don’t fit on one machine? MapReduce: Like Unix tools, but distributed across thousands of machines.

MapReduce Job Execution

Example: Count URL visits (like earlier Unix example)
# Map function
def mapper(record):
    """
    Input: Log line
    Output: (url, 1) pairs
    """
    url = extract_url(record)
    if url:
        yield (url, 1)

# Reduce function
def reducer(key, values):
    """
    Input: url, [1, 1, 1, ...]
    Output: (url, count)
    """
    yield (key, sum(values))
Execution visualization:

Distributed Filesystem

MapReduce relies on a distributed filesystem (HDFS, GFS) to store input and output. Replication for fault tolerance: Locality optimization: Run computation where data is stored

MapReduce Workflows

Often need to chain multiple MapReduce jobs: Problem: Each job writes to and reads from HDFS (slow!)

Joins in MapReduce

Problem: Join two datasets (e.g., user activity with user profiles) Sort-merge join: Reducer input:
user_id: 123
values: [
    ('activity', {page: '/home', time: '10:00'}),
    ('activity', {page: '/about', time: '10:05'}),
    ('profile', {name: 'Alice', age: 30})
]
def reduce_side_join(key, values):
    """
    Join activity records with profile
    """
    profile = None
    activities = []

    for tag, record in values:
        if tag == 'profile':
            profile = record
        else:  # tag == 'activity'
            activities.append(record)

    # Join each activity with profile
    if profile:
        for activity in activities:
            yield {
                'user_id': key,
                'name': profile['name'],
                'age': profile['age'],
                'page': activity['page'],
                'time': activity['time']
            }
Broadcast hash join (when one dataset is small):

Group By in MapReduce

Example: Calculate average age per city
# Map function
def mapper(record):
    """Emit (city, age)"""
    yield (record['city'], record['age'])

# Reduce function
def reducer(city, ages):
    """Calculate average"""
    ages_list = list(ages)
    avg_age = sum(ages_list) / len(ages_list)
    yield (city, avg_age)

Handling Skew

Problem: Some keys have many more values than others (hot keys) Solution: Skewed join with sampling

3. Beyond MapReduce

MapReduce has limitations:
  • Materializes intermediate state to HDFS (slow)
  • Only supports map and reduce operations
  • Verbose code for simple operations

Dataflow Engines

Spark, Flink, Tez: More flexible than MapReduce Arbitrary DAG (Directed Acyclic Graph) of operations:

Apache Spark

Key idea: Resilient Distributed Datasets (RDDs) with transformations
# Spark example: Same URL counting task
from pyspark import SparkContext

sc = SparkContext()

# Read log file
logs = sc.textFile("hdfs://access.log")

# Extract URLs and count
url_counts = (logs
    .map(lambda line: extract_url(line))      # Extract URL
    .filter(lambda url: url is not None)      # Remove nulls
    .map(lambda url: (url, 1))                # Create pairs
    .reduceByKey(lambda a, b: a + b)          # Count
    .sortBy(lambda pair: pair[1], ascending=False)  # Sort
    .take(5))                                 # Top 5

for url, count in url_counts:
    print(f"{count} {url}")
Lazy evaluation and optimization: In-memory caching for iterative algorithms:
# Example: PageRank (iterative algorithm)
def pagerank(links, num_iterations=10):
    # Cache links in memory
    links = links.cache()

    # Initialize ranks
    ranks = links.mapValues(lambda urls: 1.0)

    # Iterate
    for iteration in range(num_iterations):
        # Calculate contributions
        contribs = links.join(ranks).flatMap(
            lambda url_urls_rank:
                [(dest, url_urls_rank[1][1] / len(url_urls_rank[1][0]))
                 for dest in url_urls_rank[1][0]]
        )

        # Update ranks
        ranks = contribs.reduceByKey(lambda a, b: a + b).mapValues(
            lambda rank: 0.15 + 0.85 * rank
        )

    return ranks

Fault Tolerance in Dataflow Engines

MapReduce: Recompute failed tasks from HDFS Spark: Recompute from lineage Checkpointing for long lineages:

Materialization of Intermediate State

Example:

4. Graph Processing

Many algorithms need to traverse graphs:
  • Social network analysis
  • PageRank
  • Shortest paths
  • Connected components

Bulk Synchronous Parallel (BSP)

Pregel model (used by Apache Giraph, GraphX): Example: Finding shortest paths
class ShortestPathVertex:
    def __init__(self, vertex_id):
        self.id = vertex_id
        self.distance = float('inf')
        self.active = True

    def compute(self, messages):
        """Called in each superstep"""
        # Find minimum distance from messages
        if messages:
            min_dist = min(messages)
            if min_dist < self.distance:
                self.distance = min_dist
                # Send updated distance to neighbors
                for neighbor in self.neighbors:
                    self.send_message(neighbor, self.distance + 1)
            else:
                # No change, deactivate
                self.vote_to_halt()
Execution visualization:

Graph Partitioning

Challenge: Distribute graph across machines Problem with poor partitioning: Good partitioning:

5. Comparing Batch Processing Systems

Performance comparison:

Use Cases

Declarative Query Languages

High-level SQL on top of batch systems: Examples:
  • Hive: SQL on MapReduce/Tez/Spark
  • Spark SQL: SQL on Spark
  • Presto: SQL for interactive queries
-- Same query works across systems
SELECT
    city,
    AVG(age) as avg_age,
    COUNT(*) as user_count
FROM users
WHERE signup_date > '2024-01-01'
GROUP BY city
HAVING COUNT(*) > 100
ORDER BY avg_age DESC;

Summary

Key Takeaways:
  1. Unix philosophy still relevant:
    • Simple, composable tools
    • Uniform interfaces (stdin/stdout)
    • Separation of concerns
  2. MapReduce pioneered distributed batch processing:
    • Fault tolerance through replication
    • Data locality optimization
    • But limited by materialization overhead
  3. Dataflow engines improve on MapReduce:
    • Arbitrary DAGs instead of just map/reduce
    • Pipeline operations in memory
    • Better for iterative algorithms
  4. Different models for different problems:
    • Sort-merge joins for large datasets
    • Broadcast joins for small datasets
    • Graph processing for connected data
  5. Fault tolerance strategies:
    • MapReduce: Re-execute from HDFS
    • Spark: Recompute from lineage
    • Trade-off: Recomputation vs materialization
  6. High-level abstractions winning:
    • SQL on batch engines
    • Declarative > imperative
    • Query optimization opportunities
Comparison table:
SystemStrengthsWeaknessesBest For
Unix toolsSimple, fast on single machineDoesn’t scaleAd-hoc analysis, small data
MapReduceScalable, fault tolerant, matureSlow, limited operatorsVery large batch jobs
SparkFast, rich API, in-memoryMore memory requiredIterative ML, interactive queries
Pregel/GiraphOptimized for graphsLimited to graph algorithmsSocial networks, PageRank

Previous: Chapter 9: Consistency and Consensus

Build docs developers (and LLMs) love