Overview
Learn how to build real-time streaming data pipelines in Mage. This tutorial covers:
Setting up Kafka sources and sinks
Processing streaming data in real-time
Implementing windowing and aggregations
Handling backpressure and errors
Monitoring streaming pipelines
Streaming pipelines in Mage continuously process data as it arrives, unlike batch pipelines that run on schedules.
Prerequisites
Mage installed and running
Kafka cluster (or local Kafka setup)
Basic understanding of streaming concepts
Python 3.7+
Use Cases
Streaming pipelines are ideal for:
Real-time Analytics Process metrics and events as they happen
Event Processing React to business events in real-time
Data Synchronization Keep systems in sync with change data capture
Monitoring & Alerts Detect anomalies and trigger alerts instantly
Create a Streaming Pipeline
Start by creating a new streaming pipeline:
Navigate to Pipelines in Mage
Click + New pipeline
Select Streaming as the pipeline type
Name it user_events_stream
Streaming pipelines have a different architecture than batch pipelines. They run continuously and process data in micro-batches.
Configure Kafka Source
Add a data loader to consume messages from Kafka.
Click + Data loader
Select Python > Streaming source
Name it kafka_source
Create the source configuration file: connector_type : kafka
bootstrap_server : "localhost:9092"
topic : user_events
consumer_group : mage_user_events_group
api_version : "0.10.2"
auto_offset_reset : latest
batch_size : 100
timeout_ms : 500
# Optional: Configure security
# security_protocol: "SASL_SSL"
# sasl_config:
# mechanism: "PLAIN"
# username: your_username
# password: your_password
# Optional: Include message metadata
include_metadata : true
Set auto_offset_reset: earliest to process all historical messages, or latest to only process new messages.
Implement the source block: from mage_ai.streaming.sources.kafka import KafkaSource
from typing import Callable
if 'streaming_source' not in globals ():
from mage_ai.data_preparation.decorators import streaming_source
@streaming_source
class UserEventsSource ( KafkaSource ):
def init_client ( self ):
"""
Initialize the Kafka consumer
"""
self .config.topic = 'user_events'
self .config.consumer_group = 'mage_user_events_group'
super ().init_client()
print ( f "Kafka consumer initialized for topic: { self .config.topic } " )
def batch_read ( self , handler : Callable):
"""
Read messages from Kafka and process them in batches
"""
print ( "Starting to consume messages..." )
super ().batch_read(handler)
Understanding Kafka Configuration Basic Configuration
SSL Configuration
SASL Configuration
connector_type : kafka
bootstrap_server : "localhost:9092"
topic : my_topic
consumer_group : my_group
batch_size : 100
timeout_ms : 500
Transform Streaming Data
Add a transformer to process the incoming messages.
Click + Transformer
Select Python > Generic (no template)
Name it process_events
import json
import pandas as pd
from datetime import datetime
from typing import Dict, List
if 'transformer' not in globals ():
from mage_ai.data_preparation.decorators import transformer
@transformer
def transform ( messages : List[Dict], * args , ** kwargs ):
"""
Transform streaming messages from Kafka
Args:
messages: List of messages from Kafka source
Returns:
Transformed DataFrame
"""
if not messages:
return pd.DataFrame()
# Parse JSON messages
events = []
for msg in messages:
try :
# Extract message value
if isinstance (msg, dict ) and 'data' in msg:
event_data = json.loads(msg[ 'data' ])
else :
event_data = json.loads(msg)
# Add metadata if available
if isinstance (msg, dict ):
event_data[ '_kafka_partition' ] = msg.get( 'partition' )
event_data[ '_kafka_offset' ] = msg.get( 'offset' )
event_data[ '_kafka_timestamp' ] = msg.get( 'timestamp' )
events.append(event_data)
except json.JSONDecodeError as e:
print ( f "Failed to parse message: { e } " )
continue
if not events:
return pd.DataFrame()
# Convert to DataFrame
df = pd.DataFrame(events)
# Parse timestamp
df[ 'event_timestamp' ] = pd.to_datetime(df[ 'timestamp' ], unit = 'ms' )
df[ 'processing_time' ] = datetime.now()
# Calculate processing latency
df[ 'latency_ms' ] = (
df[ 'processing_time' ] - df[ 'event_timestamp' ]
).dt.total_seconds() * 1000
# Add derived fields
df[ 'event_hour' ] = df[ 'event_timestamp' ].dt.hour
df[ 'event_day_of_week' ] = df[ 'event_timestamp' ].dt.day_name()
# Filter and clean
df = df[df[ 'user_id' ].notna()]
df = df[df[ 'event_type' ].isin([ 'click' , 'view' , 'purchase' ])]
print ( f "Processed { len (df) } events" )
print ( f "Average latency: { df[ 'latency_ms' ].mean() :.2f} ms" )
return df
Add Real-time Validation from mage_ai.data_preparation.decorators import test
@test
def test_no_future_events ( output , * args ):
"""
Ensure no events are from the future
"""
if not output.empty:
future_events = output[ 'event_timestamp' ] > datetime.now()
assert not future_events.any(), f 'Found { future_events.sum() } future events'
@test
def test_valid_event_types ( output , * args ):
"""
Ensure all events have valid types
"""
if not output.empty:
valid_types = [ 'click' , 'view' , 'purchase' ]
invalid = ~ output[ 'event_type' ].isin(valid_types)
assert not invalid.any(), f 'Found { invalid.sum() } invalid event types'
Implement Windowed Aggregations
Add time-based windowing for real-time analytics.
Add another Transformer block
Name it aggregate_windows
import pandas as pd
from datetime import datetime, timedelta
if 'transformer' not in globals ():
from mage_ai.data_preparation.decorators import transformer
@transformer
def aggregate_by_window ( data : pd.DataFrame, * args , ** kwargs ):
"""
Aggregate events using tumbling windows
"""
if data.empty:
return pd.DataFrame()
# Set event timestamp as index
df = data.set_index( 'event_timestamp' )
# 5-minute tumbling windows
aggregations = {
'event_id' : 'count' ,
'user_id' : 'nunique' ,
'latency_ms' : [ 'mean' , 'max' ],
}
windowed = df.groupby([
pd.Grouper( freq = '5min' ),
'event_type'
]).agg(aggregations).reset_index()
# Flatten column names
windowed.columns = [
'window_start' ,
'event_type' ,
'event_count' ,
'unique_users' ,
'avg_latency_ms' ,
'max_latency_ms'
]
# Add window metadata
windowed[ 'window_end' ] = windowed[ 'window_start' ] + timedelta( minutes = 5 )
windowed[ 'window_duration_minutes' ] = 5
# Calculate rates
windowed[ 'events_per_second' ] = windowed[ 'event_count' ] / ( 5 * 60 )
print ( f "Generated { len (windowed) } window aggregations" )
return windowed
Implement Sliding Windows For overlapping time windows: @transformer
def sliding_window_aggregations ( data : pd.DataFrame, * args , ** kwargs ):
"""
Calculate metrics over sliding windows
"""
if data.empty:
return pd.DataFrame()
df = data.set_index( 'event_timestamp' ).sort_index()
# 10-minute sliding window with 1-minute slide
window_size = '10min'
slide = '1min'
results = []
# Group by event type
for event_type in df[ 'event_type' ].unique():
subset = df[df[ 'event_type' ] == event_type]
# Calculate rolling metrics
rolling = subset.rolling(window_size)[
[ 'user_id' , 'latency_ms' ]
].agg({
'user_id' : 'nunique' ,
'latency_ms' : 'mean'
}).resample(slide).last()
rolling[ 'event_type' ] = event_type
results.append(rolling)
return pd.concat(results).reset_index()
Configure Kafka Sink
Export processed data back to Kafka or to another destination. Option 1: Export to Kafka connector_type : kafka
bootstrap_server : "localhost:9092"
topic : processed_events
api_version : "0.10.2"
batch_size : 100
timeout_ms : 500
from mage_ai.streaming.sinks.kafka import KafkaSink
import json
if 'streaming_sink' not in globals ():
from mage_ai.data_preparation.decorators import streaming_sink
@streaming_sink
class ProcessedEventsSink ( KafkaSink ):
def init_client ( self ):
"""
Initialize Kafka producer
"""
self .config.topic = 'processed_events'
super ().init_client()
print ( f "Kafka producer initialized for topic: { self .config.topic } " )
def batch_write ( self , messages ):
"""
Write processed messages back to Kafka
"""
for msg in messages:
# Convert DataFrame row to JSON
if hasattr (msg, 'to_dict' ):
msg = msg.to_dict()
# Send to Kafka
self .producer.send(
self .config.topic,
value = msg,
key = str (msg.get( 'user_id' , '' )),
)
self .producer.flush()
print ( f "Wrote { len (messages) } messages to Kafka" )
Option 2: Export to Database For real-time database updates: from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
import pandas as pd
if 'data_exporter' not in globals ():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_to_database ( data : pd.DataFrame, * args , ** kwargs ):
"""
Export streaming data to PostgreSQL
"""
if data.empty:
print ( "No data to export" )
return
config_path = path.join(get_repo_path(), 'io_config.yaml' )
config_profile = 'default'
schema_name = 'real_time'
table_name = 'event_metrics'
with Postgres.with_config(
ConfigFileLoader(config_path, config_profile)
) as loader:
loader.export(
data,
schema_name,
table_name,
index = False ,
if_exists = 'append' , # Append for streaming data
)
print ( f "Exported { len (data) } rows to { schema_name } . { table_name } " )
Option 3: Export to Cloud Storage For data lake integration: connector_type : amazon_s3
bucket : my-streaming-data
prefix : events/processed/
file_type : parquet
batch_size : 1000
Handle Errors and Retries
Implement error handling for production reliability. from typing import Dict, List
import traceback
if 'custom' not in globals ():
from mage_ai.data_preparation.decorators import custom
@custom
def error_handler ( messages : List[Dict], * args , ** kwargs ):
"""
Handle errors and implement retry logic
"""
successful = []
failed = []
for msg in messages:
try :
# Process message
processed = process_message(msg)
successful.append(processed)
except Exception as e:
error_info = {
'message' : msg,
'error' : str (e),
'traceback' : traceback.format_exc(),
'timestamp' : datetime.now().isoformat()
}
failed.append(error_info)
print ( f "Error processing message: { e } " )
# Log failed messages to dead letter queue
if failed:
write_to_dlq(failed)
print ( f "Processed: { len (successful) } , Failed: { len (failed) } " )
return successful
def write_to_dlq ( failed_messages : List[Dict]):
"""
Write failed messages to dead letter queue
"""
# Implement DLQ logic (e.g., write to separate Kafka topic)
pass
Monitor Stream Health
Add monitoring for your streaming pipeline. import time
from datetime import datetime
from collections import deque
if 'custom' not in globals ():
from mage_ai.data_preparation.decorators import custom
class StreamMonitor :
def __init__ ( self ):
self .message_counts = deque( maxlen = 100 )
self .processing_times = deque( maxlen = 100 )
self .last_message_time = None
def record_batch ( self , message_count : int , processing_time : float ):
self .message_counts.append(message_count)
self .processing_times.append(processing_time)
self .last_message_time = datetime.now()
def get_metrics ( self ) -> dict :
return {
'avg_batch_size' : sum ( self .message_counts) / len ( self .message_counts),
'avg_processing_time' : sum ( self .processing_times) / len ( self .processing_times),
'throughput_per_second' : sum ( self .message_counts) / sum ( self .processing_times),
'last_message_time' : self .last_message_time.isoformat(),
}
monitor = StreamMonitor()
@custom
def monitor_stream ( data , * args , ** kwargs ):
"""
Monitor streaming pipeline performance
"""
start_time = time.time()
# Process data
result = data
# Record metrics
processing_time = time.time() - start_time
monitor.record_batch( len (data), processing_time)
# Log metrics
metrics = monitor.get_metrics()
print ( f "Stream metrics: { metrics } " )
# Alert on issues
if metrics[ 'avg_processing_time' ] > 5.0 :
print ( "WARNING: High processing latency detected" )
if metrics[ 'throughput_per_second' ] < 10 :
print ( "WARNING: Low throughput detected" )
return result
Advanced Streaming Patterns
Stateful Processing
Maintain state across messages:
from collections import defaultdict
class StatefulProcessor :
def __init__ ( self ):
self .user_sessions = defaultdict( list )
def process ( self , event : dict ) -> dict :
user_id = event[ 'user_id' ]
# Add to user session
self .user_sessions[user_id].append(event)
# Calculate session metrics
session = self .user_sessions[user_id]
event[ 'session_length' ] = len (session)
event[ 'session_duration' ] = (
event[ 'timestamp' ] - session[ 0 ][ 'timestamp' ]
)
return event
Exactly-Once Processing
Ensure messages are processed exactly once:
import redis
class DeduplicatingProcessor :
def __init__ ( self ):
self .redis_client = redis.Redis()
self .ttl_seconds = 3600
def process ( self , event : dict ) -> bool :
event_id = event[ 'event_id' ]
# Check if already processed
if self .redis_client.get( f "processed: { event_id } " ):
print ( f "Skipping duplicate event: { event_id } " )
return False
# Mark as processed
self .redis_client.setex(
f "processed: { event_id } " ,
self .ttl_seconds,
"1"
)
return True
Other Streaming Sources
Mage supports multiple streaming sources:
Amazon Kinesis
Google Pub/Sub
Amazon SQS
Azure Event Hub
connector_type : kinesis
stream_name : my-stream
region : us-west-2
shard_iterator_type : LATEST
batch_size : 100
Deployment Best Practices
Configure Resource Limits
Save consumer offsets regularly: # Commit offsets after processing
consumer.commit()
Handle high message rates: # Reduce batch size if processing is slow
if processing_time > threshold:
batch_size = max ( 10 , batch_size // 2 )
Scale horizontally with consumer groups: # Each consumer processes different partitions
consumer_group : my-group
num_consumers : 3
Monitoring Streaming Pipelines
Key Metrics to Track
Throughput : Messages processed per second
Latency : Time from event creation to processing
Lag : Number of unprocessed messages
Error Rate : Percentage of failed messages
Consumer Offset : Current position in the stream
View Pipeline Status
# Get pipeline status
from mage_ai.data_preparation.models.pipeline import Pipeline
pipeline = Pipeline.get( 'user_events_stream' )
status = pipeline.get_status()
print ( f "Pipeline status: { status } " )
print ( f "Last run: { pipeline.last_run_at } " )
Troubleshooting
Consumer lag is increasing
Increase batch size for better throughput
Add more consumer instances
Optimize transformation logic
Check for blocking operations
Messages are being dropped
Verify error handling is properly implemented
Check dead letter queue for failed messages
Review Kafka retention settings
Ensure proper offset management
Reduce batch size
Process messages in smaller chunks
Clear unnecessary data from memory
Check for memory leaks in custom code
Next Steps
ML Pipeline Build ML pipelines with real-time inference
Production Deployment Deploy streaming pipelines to production
Monitoring Set up comprehensive monitoring
Integration Sources Explore all available streaming sources