Skip to main content

Overview

Robust error handling is critical for production data pipelines. Mage provides multiple layers of error handling and automatic retry mechanisms built into the core execution engine.

Retry Configuration

Pipeline-Level Retries

Configure retry behavior in metadata.yaml:
# mage_ai/data_preparation/shared/retry.py
retry_config:
  retries: 3                    # Number of retry attempts
  delay: 5                      # Initial delay in seconds
  max_delay: 60                 # Maximum delay in seconds
  exponential_backoff: true     # Enable exponential backoff
The retry configuration uses exponential backoff by default. With delay: 5 and exponential backoff enabled, retries occur at 5s, 10s, 20s, 40s, capped at max_delay.

Block-Level Retries

Override retry behavior for specific blocks:
# In pipeline metadata.yaml
blocks:
  - uuid: load_external_api
    type: data_loader
    retry_config:
      retries: 5               # More retries for flaky APIs
      delay: 10
      max_delay: 300           # Wait up to 5 minutes
      exponential_backoff: true
Set higher retry counts for operations that interact with external systems (APIs, databases, cloud storage) since these are more likely to experience transient failures.

Understanding Retry Behavior

The retry mechanism is implemented in the execution engine:
# Retry logic (conceptual)
tries_by_block_uuid = {}

while not blocks.empty():
    block = blocks.get()
    
    if tries_by_block_uuid.get(block.uuid) is None:
        tries_by_block_uuid[block.uuid] = 0
    
    tries_by_block_uuid[block.uuid] += 1
    tries = tries_by_block_uuid[block.uuid]
    
    # Prevent infinite loops
    if tries >= 1000:
        raise Exception(f'Block {block.uuid} exceeded max tries')
    
    # Check upstream dependencies before retrying
    if not all_upstream_completed(block):
        blocks.put(block)  # Re-queue for later
        continue
Mage implements a safety limit of 1000 retry attempts to prevent infinite loops. If a block reaches this limit, the entire pipeline execution fails. This typically indicates a configuration error or circular dependency.

Exception Handling

Try-Catch in Blocks

Implement defensive error handling:
from mage_ai.data_preparation.decorators import data_loader
import logging

@data_loader
def load_data(*args, **kwargs):
    logger = kwargs.get('logger', logging.getLogger(__name__))
    
    try:
        # Primary data source
        data = fetch_from_primary_api()
        logger.info(f"Loaded {len(data)} records from primary source")
        return data
        
    except ConnectionError as e:
        # Fallback to secondary source
        logger.warning(f"Primary source failed: {e}")
        logger.info("Attempting fallback source")
        
        try:
            data = fetch_from_secondary_api()
            logger.info(f"Loaded {len(data)} records from fallback")
            return data
        except Exception as fallback_error:
            logger.error(f"Fallback also failed: {fallback_error}")
            raise
            
    except ValueError as e:
        # Handle data validation errors
        logger.error(f"Data validation failed: {e}")
        # Return empty result or raise depending on requirements
        raise
        
    except Exception as e:
        # Catch-all for unexpected errors
        logger.exception(f"Unexpected error in load_data: {e}")
        raise
Always include the logger from kwargs and use appropriate log levels:
  • logger.info() for normal operations
  • logger.warning() for recoverable issues
  • logger.error() for failures requiring attention
  • logger.exception() to include full stack traces

Error Context

Capture relevant context when errors occur:
@transformer
def transform(data, *args, **kwargs):
    logger = kwargs.get('logger')
    execution_date = kwargs.get('execution_date')
    
    try:
        return process_data(data)
    except Exception as e:
        # Log context to aid debugging
        logger.error(
            f"Transform failed for execution_date={execution_date}, "
            f"input_rows={len(data)}, "
            f"error={str(e)}"
        )
        
        # Include traceback for debugging
        import traceback
        logger.error(traceback.format_exc())
        
        raise

Logging Best Practices

Block Execution Logging

Mage automatically wraps block execution with logging:
# From mage_ai/data_preparation/models/block/__init__.py
with BlockFunctionExec(
    block.uuid,
    f'Executing {block.type} block...',
    build_block_output_stdout=build_block_output_stdout,
):
    block.execute_sync(
        global_vars=global_vars,
        update_status=update_status,
    )
BlockFunctionExec captures stdout/stderr and timing information automatically. This appears in the Mage UI under block execution logs.

Custom Logging

Extend default logging with custom handlers:
import logging
from mage_ai.data_preparation.decorators import data_loader

@data_loader
def load_data(*args, **kwargs):
    # Get block-specific logger
    logger = kwargs.get('logger')
    
    # Add custom context
    logger = logging.LoggerAdapter(logger, {
        'pipeline': kwargs.get('pipeline_uuid'),
        'execution_date': str(kwargs.get('execution_date')),
    })
    
    logger.info("Starting data load", extra={
        'metrics': {'source': 'api', 'version': '2.0'}
    })
    
    return data

Logging Configuration

Configure logging in metadata.yaml:
logging_config:
  level: INFO                    # DEBUG, INFO, WARNING, ERROR, CRITICAL
  
  # Destination configuration
  destination_config:
    type: s3                     # s3, gcs, local
    path: logs/                  # Log file path
    
  # Format configuration  
  format: json                   # json or text
  
  # Pipeline-specific logging
  pipeline_logs_retention_days: 30
Debug-level logging can generate large volumes of log data and impact performance. Use sparingly in production and consider pipeline-specific log retention policies.

Sensors and Polling

Sensor Blocks

Use sensors to wait for conditions before proceeding:
from mage_ai.data_preparation.decorators import sensor
import time

@sensor
def check_file_exists(*args, **kwargs):
    """Wait for file to appear before proceeding."""
    logger = kwargs.get('logger')
    config = kwargs.get('configuration', {})
    
    file_path = config.get('file_path')
    max_wait = config.get('max_wait_seconds', 3600)  # 1 hour default
    check_interval = config.get('check_interval', 60)  # 1 minute
    
    start_time = time.time()
    
    while time.time() - start_time < max_wait:
        if os.path.exists(file_path):
            logger.info(f"File {file_path} found")
            return True
            
        elapsed = time.time() - start_time
        logger.info(
            f"Waiting for {file_path}... "
            f"({elapsed:.0f}s elapsed, {max_wait - elapsed:.0f}s remaining)"
        )
        time.sleep(check_interval)
    
    # Timeout reached
    logger.error(f"Timeout waiting for {file_path} after {max_wait}s")
    return False
Sensor blocks should return True to continue pipeline execution or False to fail. Set reasonable timeouts to prevent pipelines from waiting indefinitely.

Database Sensors

Wait for data availability in databases:
from mage_ai.data_preparation.decorators import sensor
from mage_ai.io.postgres import Postgres

@sensor  
def wait_for_new_records(*args, **kwargs):
    """Wait for new records in source table."""
    logger = kwargs.get('logger')
    execution_date = kwargs.get('execution_date')
    
    config = kwargs.get('configuration', {})
    check_interval = config.get('check_interval', 300)  # 5 minutes
    max_attempts = config.get('max_attempts', 12)      # 1 hour total
    
    postgres = Postgres(**config.get('connection'))
    
    for attempt in range(max_attempts):
        query = f"""
        SELECT COUNT(*) as count
        FROM source_table  
        WHERE updated_at >= '{execution_date}'
        """
        
        result = postgres.load(query)
        count = result['count'].iloc[0]
        
        if count > 0:
            logger.info(f"Found {count} new records")
            return True
            
        logger.info(
            f"No new records yet (attempt {attempt + 1}/{max_attempts})"
        )
        time.sleep(check_interval)
    
    logger.error("Timeout: No new records found")
    return False

Callback Blocks

Success Callbacks

Execute actions after successful completion:
from mage_ai.data_preparation.decorators import callback

@callback
def on_success(*args, **kwargs):
    """Notify team of successful pipeline execution."""
    logger = kwargs.get('logger')
    pipeline_uuid = kwargs.get('pipeline_uuid')
    execution_date = kwargs.get('execution_date')
    
    try:
        send_notification(
            channel='#data-pipelines',
            message=f'✅ Pipeline {pipeline_uuid} completed successfully',
            metadata={
                'execution_date': str(execution_date),
                'duration': calculate_duration(kwargs),
            }
        )
        logger.info("Success notification sent")
    except Exception as e:
        # Don't fail the pipeline if notification fails
        logger.warning(f"Failed to send notification: {e}")
Callback blocks are configured in pipeline metadata with upstream_blocks pointing to the parent block. They execute based on the parent’s status.

Failure Callbacks

Handle failures and alert teams:
from mage_ai.data_preparation.decorators import callback

@callback
def on_failure(*args, **kwargs):
    """Alert on-call when pipeline fails."""
    logger = kwargs.get('logger')
    pipeline_uuid = kwargs.get('pipeline_uuid')
    block_uuid = kwargs.get('block_uuid', 'unknown')
    
    try:
        # Get failure details from parent block
        error_message = kwargs.get('error_message', 'Unknown error')
        
        alert_oncall(
            severity='high',
            title=f'Pipeline {pipeline_uuid} failed',
            description=f'Block {block_uuid} failed: {error_message}',
            tags=['pipeline', 'failure', pipeline_uuid],
        )
        
        # Log to monitoring system
        send_metric(
            'pipeline.failure',
            tags={'pipeline': pipeline_uuid, 'block': block_uuid}
        )
        
        logger.info("Failure alert sent")
    except Exception as e:
        logger.error(f"Failed to send alert: {e}")
        # Continue to allow pipeline to fail gracefully
Keep callback blocks lightweight and fault-tolerant. A failing callback should not prevent the pipeline from completing its error handling. Always wrap callback logic in try-except.

Conditional Blocks

Flow Control

Use conditionals to implement branching logic:
from mage_ai.data_preparation.decorators import condition

@condition
def check_data_quality(data, *args, **kwargs):
    """Validate data quality before proceeding."""
    logger = kwargs.get('logger')
    
    # Define quality thresholds
    min_records = 100
    max_null_percentage = 5.0
    
    # Check record count
    if len(data) < min_records:
        logger.warning(
            f"Insufficient records: {len(data)} < {min_records}"
        )
        return False
    
    # Check null values
    null_pct = (data.isnull().sum().sum() / data.size) * 100
    if null_pct > max_null_percentage:
        logger.warning(
            f"Excessive nulls: {null_pct:.1f}% > {max_null_percentage}%"
        )
        return False
    
    logger.info(f"Data quality passed: {len(data)} records, {null_pct:.1f}% nulls")
    return True

Multiple Conditions

Chain multiple conditional checks:
@condition
def check_business_rules(data, *args, **kwargs):
    """Validate business logic constraints."""
    logger = kwargs.get('logger')
    failures = []
    
    # Rule 1: Revenue must be positive
    if (data['revenue'] < 0).any():
        failures.append("Negative revenue detected")
    
    # Rule 2: Dates must be valid
    if data['transaction_date'].max() > pd.Timestamp.now():
        failures.append("Future dates detected")
    
    # Rule 3: Required fields must be present
    required_fields = ['customer_id', 'amount', 'date']
    missing = [f for f in required_fields if f not in data.columns]
    if missing:
        failures.append(f"Missing required fields: {missing}")
    
    if failures:
        for failure in failures:
            logger.error(f"Business rule violation: {failure}")
        return False
    
    logger.info("All business rules passed")
    return True
Conditional blocks affect all downstream blocks. Return False to skip downstream execution, or raise an exception to fail the pipeline entirely.

Timeout Configuration

Block Timeouts

Set maximum execution time for blocks:
# In pipeline metadata.yaml  
blocks:
  - uuid: long_running_query
    type: data_loader
    timeout: 3600                # 1 hour timeout in seconds
@data_loader
def load_data(*args, **kwargs):
    # Block will be terminated if execution exceeds timeout
    # Ensure cleanup logic handles interruption gracefully
    
    try:
        return expensive_operation()
    finally:
        # Cleanup runs even if timeout occurs
        cleanup_resources()
Timeout interrupts block execution immediately. Ensure your code handles interruption gracefully, especially when dealing with transactions or file handles. Use try-finally blocks for cleanup.

Error Recovery Patterns

Checkpointing

Save progress to enable recovery:
@data_loader
def load_large_dataset(*args, **kwargs):
    logger = kwargs.get('logger')
    checkpoint_file = '/tmp/load_checkpoint.json'
    
    # Try to resume from checkpoint
    start_offset = 0
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file) as f:
            checkpoint = json.load(f)
            start_offset = checkpoint.get('offset', 0)
            logger.info(f"Resuming from offset {start_offset}")
    
    all_data = []
    batch_size = 10000
    
    try:
        for offset in range(start_offset, total_records, batch_size):
            batch = fetch_batch(offset, batch_size)
            all_data.extend(batch)
            
            # Save checkpoint
            with open(checkpoint_file, 'w') as f:
                json.dump({'offset': offset + batch_size}, f)
            
            logger.info(f"Processed {len(all_data)} records")
        
        # Clean up checkpoint on success
        if os.path.exists(checkpoint_file):
            os.remove(checkpoint_file)
            
        return pd.DataFrame(all_data)
        
    except Exception as e:
        logger.error(f"Failed at offset {offset}: {e}")
        logger.info(f"Checkpoint saved. Retry will resume from {offset}")
        raise

Idempotent Operations

Design blocks to be safely re-executable:
@data_exporter
def export_to_database(data, *args, **kwargs):
    """Idempotent database export using upsert."""
    logger = kwargs.get('logger')
    config = kwargs.get('configuration', {})
    
    from mage_ai.io.postgres import Postgres
    
    postgres = Postgres(**config.get('connection'))
    
    # Use UPSERT to make operation idempotent
    # Re-running will update existing records instead of duplicating
    postgres.export(
        data,
        table_name='target_table',
        if_exists='replace',  # or 'update' with unique constraint
        unique_conflict_columns=['id'],  # For UPDATE behavior
    )
    
    logger.info(f"Exported {len(data)} records (idempotent)")
Idempotent blocks can be safely retried without side effects. Design exports to use upsert operations, and loaders to handle duplicate fetches gracefully.

Memory and Resource Errors

Out of Memory Handling

Prevent memory exhaustion:
@transformer
def transform_large_dataset(data, *args, **kwargs):
    """Process large datasets in chunks."""
    logger = kwargs.get('logger')
    
    # Check available memory
    import psutil
    available_memory_gb = psutil.virtual_memory().available / (1024**3)
    
    if available_memory_gb < 2:
        logger.warning(f"Low memory: {available_memory_gb:.1f}GB available")
        # Process in smaller chunks
        chunk_size = 10000
    else:
        chunk_size = 100000
    
    logger.info(f"Processing with chunk_size={chunk_size}")
    
    results = []
    for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
        processed = process_chunk(chunk)
        results.append(processed)
        
        # Clear memory
        del chunk
        
    return pd.concat(results, ignore_index=True)
Mage tracks memory usage via execute_with_memory_tracking(). Resource usage data is stored in block.resource_usage and can be used for optimization.

Testing Error Handling

Test Block Error Cases

from mage_ai.data_preparation.decorators import test

@test
def test_handles_empty_input(output, *args):
    """Verify block handles edge cases."""
    assert output is not None, 'Should return empty DataFrame, not None'
    assert len(output) >= 0, 'Output should be valid DataFrame'

@test
def test_validates_schema(output, *args):
    """Verify output has required columns."""
    required = ['id', 'timestamp', 'value']
    missing = [col for col in required if col not in output.columns]
    assert not missing, f'Missing required columns: {missing}'

@test
def test_data_quality(output, *args):
    """Verify data quality constraints."""
    assert output['id'].notna().all(), 'ID column has nulls'
    assert (output['value'] >= 0).all(), 'Negative values detected'

Summary

Configuration:
  • ✅ Set appropriate retry counts for external operations
  • ✅ Enable exponential backoff for transient failures
  • ✅ Configure timeouts for long-running blocks
  • ✅ Set up logging with appropriate retention
Implementation:
  • ✅ Use try-except blocks with specific exception types
  • ✅ Log context (execution_date, data size, etc.) on errors
  • ✅ Include logger from kwargs in all blocks
  • ✅ Design idempotent operations for safe retries
  • ✅ Implement checkpointing for long-running loads
  • ✅ Add test blocks to validate error handling
Monitoring:
  • ✅ Set up callback blocks for success/failure notifications
  • ✅ Use conditional blocks for data quality gates
  • ✅ Implement sensor blocks with reasonable timeouts
  • ✅ Track resource usage for optimization
Avoid:
  • ❌ Silent failures (always log errors)
  • ❌ Generic exception handlers without logging
  • ❌ Blocking indefinitely in sensor blocks
  • ❌ Failing callbacks (wrap in try-except)
  • ❌ Non-idempotent operations without checkpoints

Build docs developers (and LLMs) love