Skip to main content

Introduction

This book has covered many aspects of data systems: storage, retrieval, replication, partitioning, transactions, consistency, and processing. Now we bring these ideas together to think about how to design better data systems. Key questions:
  • How do we integrate disparate systems?
  • How do we ensure correctness across systems?
  • How do we evolve systems over time?
  • How do we maintain data quality and integrity?

1. Data Integration

Most applications need multiple different data systems working together. Challenge: Keep all these systems synchronized and consistent.

Combining Specialized Tools

No single database is good at everything.

The Problem of Data Integration

Traditional approach: Dual writes from application Problems with dual writes: Example of race condition:

Better Approach: Single Source of Truth

Benefits:
  • Single source of truth
  • Guaranteed ordering (within partition)
  • Async consumers can process at own pace
  • Easy to add new derived views
# Example: Event-driven architecture
class UserService:
    def update_user(self, user_id, updates):
        """Single source of truth: database"""
        # 1. Update database
        with db.transaction():
            user = db.users.get(user_id)
            user.update(updates)
            db.users.save(user)

        # 2. Publish change event
        event = {
            'event_type': 'user_updated',
            'user_id': user_id,
            'changes': updates,
            'timestamp': datetime.now().isoformat()
        }
        event_log.publish('user_changes', event)

        # Consumers update derived views:
        # - Search index consumer
        # - Cache invalidation consumer
        # - Analytics consumer

2. Unbundling Databases

Traditional databases bundle many features together. Modern trend: Unbundle and use specialized systems

Composing Data Storage Technologies

Database features as separate services:

Designing Applications Around Dataflow

Dataflow architecture: Data flows through system as events Command Query Responsibility Segregation (CQRS): Example:
# Write side: Commands
class OrderService:
    def place_order(self, order_data):
        """Command: Place order"""
        # Validate
        if not self.validate_inventory(order_data):
            raise InsufficientInventory()

        # Create events
        events = [
            OrderPlaced(order_id=order_data['id'], ...),
            InventoryReserved(items=order_data['items']),
            PaymentRequested(amount=order_data['total'])
        ]

        # Append to event store
        event_store.append_events(events)

# Read side: Queries
class OrderQueryService:
    def get_order_summary(self, order_id):
        """Query: Get order summary (read model)"""
        # Read from optimized read model
        return order_summary_cache.get(order_id)

    def get_orders_by_customer(self, customer_id):
        """Query: Customer order history (read model)"""
        return customer_orders_index.query(customer_id)

# Background: Update read models
class ReadModelUpdater:
    def consume_events(self):
        """Subscribe to event log and update read models"""
        for event in event_store.subscribe():
            if isinstance(event, OrderPlaced):
                # Update order summary cache
                order_summary_cache.set(event.order_id, {
                    'status': 'placed',
                    'total': event.total,
                    ...
                })
                # Update customer index
                customer_orders_index.add(event.customer_id, event.order_id)

3. Derived Data

Principle: Some data is derived from other data.

Lambda Architecture

Lambda Architecture: Batch + stream processing for derived views Lambda example: Problems with Lambda:

Kappa Architecture

Kappa Architecture: Stream processing only (simpler) Comparison: Example:
# Kappa architecture: Single code path
class ViewMaintainer:
    def __init__(self, event_log, checkpoint_store):
        self.event_log = event_log
        self.checkpoint_store = checkpoint_store
        self.view = {}

    def process_events(self, from_offset=0):
        """Process events from log, maintaining view"""
        offset = from_offset or self.checkpoint_store.get_offset()

        for event in self.event_log.read_from(offset):
            # Same code for both initial and incremental processing
            self.update_view(event)

            # Checkpoint progress
            if offset % 1000 == 0:
                self.checkpoint_store.save_offset(offset)
            offset += 1

    def update_view(self, event):
        """Update view based on event"""
        if event['type'] == 'user_created':
            self.view[event['user_id']] = {
                'name': event['name'],
                'email': event['email']
            }
        elif event['type'] == 'user_updated':
            if event['user_id'] in self.view:
                self.view[event['user_id']].update(event['updates'])

# To rebuild view: Just replay from offset 0
# Same code, different starting point

4. End-to-End Argument for Data Systems

End-to-end argument: For reliability, need end-to-end checks Solution: End-to-end checks

Exactly-Once Semantics

Challenge: Achieving exactly-once in distributed systems Idempotence as solution: Example:
# Idempotent payment processing
class PaymentProcessor:
    def __init__(self):
        self.processed_requests = set()  # Or database table

    def process_payment(self, request_id, payment_data):
        """Idempotent payment processing"""
        # Check if already processed
        if request_id in self.processed_requests:
            # Return previous result (already processed)
            return self.get_previous_result(request_id)

        # Process payment (within transaction)
        with db.transaction():
            # Execute payment
            result = self.execute_payment(payment_data)

            # Record that we processed this request
            self.processed_requests.add(request_id)
            self.save_result(request_id, result)

        return result

# Client retries are safe
payment_id = uuid.uuid4()
try:
    result = payment_processor.process_payment(payment_id, payment_data)
except NetworkError:
    # Safe to retry with same ID
    result = payment_processor.process_payment(payment_id, payment_data)

Duplicate Suppression

Methods for detecting duplicates: Windowed deduplication:

5. Enforcing Constraints

Challenge: Maintaining integrity across distributed systems

Uniqueness Constraints

In single database: Easy (unique index) Across systems: Harder Solutions: Example: Username uniqueness
# Solution 1: Partition by username
def register_user(username, user_data):
    """Register user with unique username"""
    # Hash username to determine partition
    partition = hash(username) % num_partitions

    # All operations for this username go to same partition
    with partition_lock(partition):
        # Check uniqueness within partition
        if username_exists_in_partition(partition, username):
            raise UsernameAlreadyTaken()

        # Register user
        store_user_in_partition(partition, username, user_data)

# Solution 2: Two-phase registration
def register_user_two_phase(username, user_data):
    """Register with reservation + confirmation"""
    # Phase 1: Reserve username
    reservation_id = reserve_username(username)

    if reservation_id is None:
        raise UsernameAlreadyTaken()

    try:
        # Phase 2: Complete registration
        complete_registration(reservation_id, username, user_data)
    except Exception as e:
        # Cancel reservation on failure
        cancel_reservation(reservation_id)
        raise e

Timeliness and Integrity

Trade-off: Speed vs correctness Apology-based approach:

Coordination-Avoidance

CALM theorem: Consistency As Logical Monotonicity

6. Trust, But Verify

Principle: Don’t blindly trust components

Auditing

Immutable event log for auditing: Example:
# Audit log implementation
class AuditLog:
    def __init__(self, event_store):
        self.event_store = event_store

    def log_action(self, user_id, action, details):
        """Log user action with full context"""
        event = {
            'event_id': uuid.uuid4(),
            'timestamp': datetime.now().isoformat(),
            'user_id': user_id,
            'action': action,
            'details': details,
            'ip_address': get_client_ip(),
            'user_agent': get_user_agent(),
            'session_id': get_session_id()
        }

        # Append to immutable log
        self.event_store.append(event)

    def find_suspicious_activity(self):
        """Detect anomalies in audit log"""
        events = self.event_store.read_all()

        # Look for patterns
        for user_id, user_events in group_by_user(events):
            # Multiple logins from different locations?
            locations = [e['ip_address'] for e in user_events]
            if len(set(locations)) > 5:
                yield f"Suspicious: User {user_id} from {len(set(locations))} locations"

            # High-value actions without proper authentication?
            sensitive_actions = [e for e in user_events
                                 if e['action'] in ['delete_account', 'transfer_money']]
            for action in sensitive_actions:
                if not action['details'].get('two_factor_verified'):
                    yield f"Suspicious: Sensitive action without 2FA"

# Usage
audit = AuditLog(event_store)

# Log every action
audit.log_action(
    user_id=123,
    action='update_profile',
    details={'field': 'email', 'old': '[email protected]', 'new': '[email protected]'}
)

# Periodic anomaly detection
for alert in audit.find_suspicious_activity():
    send_alert(alert)

Designing for Auditability

7. Doing the Right Thing

Ethical considerations in data systems:

Privacy and Data Protection

Example: GDPR compliance
class GDPRCompliantUserService:
    def collect_user_data(self, user_data):
        """Collect only necessary data with consent"""
        # 1. Explicit consent
        if not user_data.get('consent_given'):
            raise ConsentRequired()

        # 2. Data minimization
        necessary_fields = ['email', 'name']
        collected = {k: v for k, v in user_data.items()
                    if k in necessary_fields}

        # 3. Purpose limitation
        collected['purpose'] = 'account_creation'
        collected['collected_at'] = datetime.now()

        return collected

    def export_user_data(self, user_id):
        """Right to data portability"""
        # Export all data about user
        user_data = db.users.get(user_id)
        user_events = event_store.get_events_for_user(user_id)

        return {
            'profile': user_data,
            'activity_history': user_events,
            'format': 'json',
            'exported_at': datetime.now().isoformat()
        }

    def delete_user_data(self, user_id):
        """Right to be forgotten"""
        # 1. Delete from primary storage
        db.users.delete(user_id)

        # 2. Anonymize in event log (can't delete for audit)
        event_store.anonymize_user_events(user_id)

        # 3. Remove from derived views
        search_index.delete_user(user_id)
        cache.invalidate_user(user_id)

        # 4. Log deletion for audit
        audit_log.log_action(
            user_id=user_id,
            action='account_deleted',
            details={'reason': 'user_request'}
        )

Summary

Key Takeaways:
  1. Data Integration:
    • Avoid dual writes
    • Use event logs as integration backbone
    • Maintain single source of truth
  2. Unbundling Databases:
    • Combine specialized systems
    • Event log enables loose coupling
    • CQRS separates reads and writes
  3. Derived Data:
    • Distinguish system of record from derived views
    • Lambda vs Kappa architectures
    • Stream processing for maintaining views
  4. End-to-End Correctness:
    • Database transactions not enough
    • Need application-level checks
    • Idempotence critical for reliability
  5. Enforcing Constraints:
    • Uniqueness requires coordination
    • Trade-off: timeliness vs integrity
    • Some operations can avoid coordination (CALM)
  6. Trust and Verification:
    • Audit everything
    • Immutable event logs
    • Design for forensics
  7. Ethical Responsibilities:
    • Privacy by design
    • Data minimization
    • Right to be forgotten
    • Fairness and transparency
Architecture Comparison:
PatternProsConsUse When
Traditional DBSimple, ACID guaranteesLimited scalability, single toolSmall applications
Dual WritesAppears simpleRace conditions, inconsistency❌ Don’t use
Event Log + CDCReliable, ordered, extensibleMore complexMultiple derived views
LambdaBatch + streamTwo code pathsHistorical + real-time
KappaSingle code pathRequires replayable logEvent-driven systems
CQRSOptimize reads/writes separatelyMore componentsComplex read patterns
Design Principles Summary: Final Thoughts: Data systems are evolving from monolithic databases toward:
  • Unbundled architectures: Specialized tools working together
  • Event-driven design: Data flows as immutable events
  • Derived state: Views maintained from event log
  • End-to-end thinking: Correctness at application level
  • Ethical design: Privacy, fairness, and transparency
The future is about composing the right tools for each job, with events as the common language binding them together.
Previous: Chapter 11: Stream Processing Conclusion: This concludes our journey through Designing Data-Intensive Applications. We’ve covered storage, distribution, processing, and now integration—the complete picture of building robust, scalable, and maintainable data systems.

Build docs developers (and LLMs) love