Skip to main content

Overview

Learn how to build real-time streaming data pipelines in Mage. This tutorial covers:
  • Setting up Kafka sources and sinks
  • Processing streaming data in real-time
  • Implementing windowing and aggregations
  • Handling backpressure and errors
  • Monitoring streaming pipelines
Streaming pipelines in Mage continuously process data as it arrives, unlike batch pipelines that run on schedules.

Prerequisites

  • Mage installed and running
  • Kafka cluster (or local Kafka setup)
  • Basic understanding of streaming concepts
  • Python 3.7+

Use Cases

Streaming pipelines are ideal for:

Real-time Analytics

Process metrics and events as they happen

Event Processing

React to business events in real-time

Data Synchronization

Keep systems in sync with change data capture

Monitoring & Alerts

Detect anomalies and trigger alerts instantly

1

Create a Streaming Pipeline

Start by creating a new streaming pipeline:
  1. Navigate to Pipelines in Mage
  2. Click + New pipeline
  3. Select Streaming as the pipeline type
  4. Name it user_events_stream
Streaming pipelines have a different architecture than batch pipelines. They run continuously and process data in micro-batches.
2

Configure Kafka Source

Add a data loader to consume messages from Kafka.
  1. Click + Data loader
  2. Select Python > Streaming source
  3. Name it kafka_source
Create the source configuration file:
kafka_source.yaml
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: user_events
consumer_group: mage_user_events_group
api_version: "0.10.2"
auto_offset_reset: latest
batch_size: 100
timeout_ms: 500

# Optional: Configure security
# security_protocol: "SASL_SSL"
# sasl_config:
#   mechanism: "PLAIN"
#   username: your_username
#   password: your_password

# Optional: Include message metadata
include_metadata: true
Set auto_offset_reset: earliest to process all historical messages, or latest to only process new messages.
Implement the source block:
kafka_source.py
from mage_ai.streaming.sources.kafka import KafkaSource
from typing import Callable

if 'streaming_source' not in globals():
    from mage_ai.data_preparation.decorators import streaming_source

@streaming_source
class UserEventsSource(KafkaSource):
    def init_client(self):
        """
        Initialize the Kafka consumer
        """
        self.config.topic = 'user_events'
        self.config.consumer_group = 'mage_user_events_group'
        super().init_client()
        print(f"Kafka consumer initialized for topic: {self.config.topic}")
    
    def batch_read(self, handler: Callable):
        """
        Read messages from Kafka and process them in batches
        """
        print("Starting to consume messages...")
        super().batch_read(handler)

Understanding Kafka Configuration

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: my_topic
consumer_group: my_group
batch_size: 100
timeout_ms: 500
3

Transform Streaming Data

Add a transformer to process the incoming messages.
  1. Click + Transformer
  2. Select Python > Generic (no template)
  3. Name it process_events
import json
import pandas as pd
from datetime import datetime
from typing import Dict, List

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def transform(messages: List[Dict], *args, **kwargs):
    """
    Transform streaming messages from Kafka
    
    Args:
        messages: List of messages from Kafka source
    
    Returns:
        Transformed DataFrame
    """
    if not messages:
        return pd.DataFrame()
    
    # Parse JSON messages
    events = []
    for msg in messages:
        try:
            # Extract message value
            if isinstance(msg, dict) and 'data' in msg:
                event_data = json.loads(msg['data'])
            else:
                event_data = json.loads(msg)
            
            # Add metadata if available
            if isinstance(msg, dict):
                event_data['_kafka_partition'] = msg.get('partition')
                event_data['_kafka_offset'] = msg.get('offset')
                event_data['_kafka_timestamp'] = msg.get('timestamp')
            
            events.append(event_data)
            
        except json.JSONDecodeError as e:
            print(f"Failed to parse message: {e}")
            continue
    
    if not events:
        return pd.DataFrame()
    
    # Convert to DataFrame
    df = pd.DataFrame(events)
    
    # Parse timestamp
    df['event_timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df['processing_time'] = datetime.now()
    
    # Calculate processing latency
    df['latency_ms'] = (
        df['processing_time'] - df['event_timestamp']
    ).dt.total_seconds() * 1000
    
    # Add derived fields
    df['event_hour'] = df['event_timestamp'].dt.hour
    df['event_day_of_week'] = df['event_timestamp'].dt.day_name()
    
    # Filter and clean
    df = df[df['user_id'].notna()]
    df = df[df['event_type'].isin(['click', 'view', 'purchase'])]
    
    print(f"Processed {len(df)} events")
    print(f"Average latency: {df['latency_ms'].mean():.2f}ms")
    
    return df

Add Real-time Validation

from mage_ai.data_preparation.decorators import test

@test
def test_no_future_events(output, *args):
    """
    Ensure no events are from the future
    """
    if not output.empty:
        future_events = output['event_timestamp'] > datetime.now()
        assert not future_events.any(), f'Found {future_events.sum()} future events'

@test  
def test_valid_event_types(output, *args):
    """
    Ensure all events have valid types
    """
    if not output.empty:
        valid_types = ['click', 'view', 'purchase']
        invalid = ~output['event_type'].isin(valid_types)
        assert not invalid.any(), f'Found {invalid.sum()} invalid event types'
4

Implement Windowed Aggregations

Add time-based windowing for real-time analytics.
  1. Add another Transformer block
  2. Name it aggregate_windows
import pandas as pd
from datetime import datetime, timedelta

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def aggregate_by_window(data: pd.DataFrame, *args, **kwargs):
    """
    Aggregate events using tumbling windows
    """
    if data.empty:
        return pd.DataFrame()
    
    # Set event timestamp as index
    df = data.set_index('event_timestamp')
    
    # 5-minute tumbling windows
    aggregations = {
        'event_id': 'count',
        'user_id': 'nunique',
        'latency_ms': ['mean', 'max'],
    }
    
    windowed = df.groupby([
        pd.Grouper(freq='5min'),
        'event_type'
    ]).agg(aggregations).reset_index()
    
    # Flatten column names
    windowed.columns = [
        'window_start',
        'event_type',
        'event_count',
        'unique_users',
        'avg_latency_ms',
        'max_latency_ms'
    ]
    
    # Add window metadata
    windowed['window_end'] = windowed['window_start'] + timedelta(minutes=5)
    windowed['window_duration_minutes'] = 5
    
    # Calculate rates
    windowed['events_per_second'] = windowed['event_count'] / (5 * 60)
    
    print(f"Generated {len(windowed)} window aggregations")
    
    return windowed

Implement Sliding Windows

For overlapping time windows:
@transformer
def sliding_window_aggregations(data: pd.DataFrame, *args, **kwargs):
    """
    Calculate metrics over sliding windows
    """
    if data.empty:
        return pd.DataFrame()
    
    df = data.set_index('event_timestamp').sort_index()
    
    # 10-minute sliding window with 1-minute slide
    window_size = '10min'
    slide = '1min'
    
    results = []
    
    # Group by event type
    for event_type in df['event_type'].unique():
        subset = df[df['event_type'] == event_type]
        
        # Calculate rolling metrics
        rolling = subset.rolling(window_size)[
            ['user_id', 'latency_ms']
        ].agg({
            'user_id': 'nunique',
            'latency_ms': 'mean'
        }).resample(slide).last()
        
        rolling['event_type'] = event_type
        results.append(rolling)
    
    return pd.concat(results).reset_index()
5

Configure Kafka Sink

Export processed data back to Kafka or to another destination.

Option 1: Export to Kafka

kafka_sink.yaml
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: processed_events
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
from mage_ai.streaming.sinks.kafka import KafkaSink
import json

if 'streaming_sink' not in globals():
    from mage_ai.data_preparation.decorators import streaming_sink

@streaming_sink
class ProcessedEventsSink(KafkaSink):
    def init_client(self):
        """
        Initialize Kafka producer
        """
        self.config.topic = 'processed_events'
        super().init_client()
        print(f"Kafka producer initialized for topic: {self.config.topic}")
    
    def batch_write(self, messages):
        """
        Write processed messages back to Kafka
        """
        for msg in messages:
            # Convert DataFrame row to JSON
            if hasattr(msg, 'to_dict'):
                msg = msg.to_dict()
            
            # Send to Kafka
            self.producer.send(
                self.config.topic,
                value=msg,
                key=str(msg.get('user_id', '')),
            )
        
        self.producer.flush()
        print(f"Wrote {len(messages)} messages to Kafka")

Option 2: Export to Database

For real-time database updates:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader  
from mage_ai.io.postgres import Postgres
from os import path
import pandas as pd

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_to_database(data: pd.DataFrame, *args, **kwargs):
    """
    Export streaming data to PostgreSQL
    """
    if data.empty:
        print("No data to export")
        return
    
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'
    
    schema_name = 'real_time'
    table_name = 'event_metrics'
    
    with Postgres.with_config(
        ConfigFileLoader(config_path, config_profile)
    ) as loader:
        loader.export(
            data,
            schema_name,
            table_name,
            index=False,
            if_exists='append',  # Append for streaming data
        )
    
    print(f"Exported {len(data)} rows to {schema_name}.{table_name}")

Option 3: Export to Cloud Storage

For data lake integration:
s3_sink.yaml
connector_type: amazon_s3
bucket: my-streaming-data
prefix: events/processed/
file_type: parquet
batch_size: 1000
6

Handle Errors and Retries

Implement error handling for production reliability.
from typing import Dict, List
import traceback

if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom

@custom
def error_handler(messages: List[Dict], *args, **kwargs):
    """
    Handle errors and implement retry logic
    """
    successful = []
    failed = []
    
    for msg in messages:
        try:
            # Process message
            processed = process_message(msg)
            successful.append(processed)
            
        except Exception as e:
            error_info = {
                'message': msg,
                'error': str(e),
                'traceback': traceback.format_exc(),
                'timestamp': datetime.now().isoformat()
            }
            failed.append(error_info)
            print(f"Error processing message: {e}")
    
    # Log failed messages to dead letter queue
    if failed:
        write_to_dlq(failed)
    
    print(f"Processed: {len(successful)}, Failed: {len(failed)}")
    
    return successful

def write_to_dlq(failed_messages: List[Dict]):
    """
    Write failed messages to dead letter queue
    """
    # Implement DLQ logic (e.g., write to separate Kafka topic)
    pass
7

Monitor Stream Health

Add monitoring for your streaming pipeline.
import time
from datetime import datetime
from collections import deque

if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom

class StreamMonitor:
    def __init__(self):
        self.message_counts = deque(maxlen=100)
        self.processing_times = deque(maxlen=100)
        self.last_message_time = None
    
    def record_batch(self, message_count: int, processing_time: float):
        self.message_counts.append(message_count)
        self.processing_times.append(processing_time)
        self.last_message_time = datetime.now()
    
    def get_metrics(self) -> dict:
        return {
            'avg_batch_size': sum(self.message_counts) / len(self.message_counts),
            'avg_processing_time': sum(self.processing_times) / len(self.processing_times),
            'throughput_per_second': sum(self.message_counts) / sum(self.processing_times),
            'last_message_time': self.last_message_time.isoformat(),
        }

monitor = StreamMonitor()

@custom  
def monitor_stream(data, *args, **kwargs):
    """
    Monitor streaming pipeline performance
    """
    start_time = time.time()
    
    # Process data
    result = data
    
    # Record metrics
    processing_time = time.time() - start_time
    monitor.record_batch(len(data), processing_time)
    
    # Log metrics
    metrics = monitor.get_metrics()
    print(f"Stream metrics: {metrics}")
    
    # Alert on issues
    if metrics['avg_processing_time'] > 5.0:
        print("WARNING: High processing latency detected")
    
    if metrics['throughput_per_second'] < 10:
        print("WARNING: Low throughput detected")
    
    return result

Advanced Streaming Patterns

Stateful Processing

Maintain state across messages:
from collections import defaultdict

class StatefulProcessor:
    def __init__(self):
        self.user_sessions = defaultdict(list)
    
    def process(self, event: dict) -> dict:
        user_id = event['user_id']
        
        # Add to user session
        self.user_sessions[user_id].append(event)
        
        # Calculate session metrics
        session = self.user_sessions[user_id]
        event['session_length'] = len(session)
        event['session_duration'] = (
            event['timestamp'] - session[0]['timestamp']
        )
        
        return event

Exactly-Once Processing

Ensure messages are processed exactly once:
import redis

class DeduplicatingProcessor:
    def __init__(self):
        self.redis_client = redis.Redis()
        self.ttl_seconds = 3600
    
    def process(self, event: dict) -> bool:
        event_id = event['event_id']
        
        # Check if already processed
        if self.redis_client.get(f"processed:{event_id}"):
            print(f"Skipping duplicate event: {event_id}")
            return False
        
        # Mark as processed
        self.redis_client.setex(
            f"processed:{event_id}",
            self.ttl_seconds,
            "1"
        )
        
        return True

Other Streaming Sources

Mage supports multiple streaming sources:
connector_type: kinesis
stream_name: my-stream
region: us-west-2
shard_iterator_type: LATEST
batch_size: 100

Deployment Best Practices

Set appropriate memory and CPU limits:
executor_config:
  max_cpu: 2
  memory: 2048  # MB
Save consumer offsets regularly:
# Commit offsets after processing
consumer.commit()
Handle high message rates:
# Reduce batch size if processing is slow
if processing_time > threshold:
    batch_size = max(10, batch_size // 2)
Scale horizontally with consumer groups:
# Each consumer processes different partitions
consumer_group: my-group
num_consumers: 3

Monitoring Streaming Pipelines

Key Metrics to Track

  • Throughput: Messages processed per second
  • Latency: Time from event creation to processing
  • Lag: Number of unprocessed messages
  • Error Rate: Percentage of failed messages
  • Consumer Offset: Current position in the stream

View Pipeline Status

# Get pipeline status
from mage_ai.data_preparation.models.pipeline import Pipeline

pipeline = Pipeline.get('user_events_stream')
status = pipeline.get_status()

print(f"Pipeline status: {status}")
print(f"Last run: {pipeline.last_run_at}")

Troubleshooting

  • Increase batch size for better throughput
  • Add more consumer instances
  • Optimize transformation logic
  • Check for blocking operations
  • Verify error handling is properly implemented
  • Check dead letter queue for failed messages
  • Review Kafka retention settings
  • Ensure proper offset management
  • Reduce batch size
  • Process messages in smaller chunks
  • Clear unnecessary data from memory
  • Check for memory leaks in custom code

Next Steps

ML Pipeline

Build ML pipelines with real-time inference

Production Deployment

Deploy streaming pipelines to production

Monitoring

Set up comprehensive monitoring

Integration Sources

Explore all available streaming sources

Build docs developers (and LLMs) love