Robust error handling is critical for production data pipelines. Mage provides multiple layers of error handling and automatic retry mechanisms built into the core execution engine.
# mage_ai/data_preparation/shared/retry.pyretry_config: retries: 3 # Number of retry attempts delay: 5 # Initial delay in seconds max_delay: 60 # Maximum delay in seconds exponential_backoff: true # Enable exponential backoff
The retry configuration uses exponential backoff by default. With delay: 5 and exponential backoff enabled, retries occur at 5s, 10s, 20s, 40s, capped at max_delay.
# In pipeline metadata.yamlblocks: - uuid: load_external_api type: data_loader retry_config: retries: 5 # More retries for flaky APIs delay: 10 max_delay: 300 # Wait up to 5 minutes exponential_backoff: true
Set higher retry counts for operations that interact with external systems (APIs, databases, cloud storage) since these are more likely to experience transient failures.
The retry mechanism is implemented in the execution engine:
# Retry logic (conceptual)tries_by_block_uuid = {}while not blocks.empty(): block = blocks.get() if tries_by_block_uuid.get(block.uuid) is None: tries_by_block_uuid[block.uuid] = 0 tries_by_block_uuid[block.uuid] += 1 tries = tries_by_block_uuid[block.uuid] # Prevent infinite loops if tries >= 1000: raise Exception(f'Block {block.uuid} exceeded max tries') # Check upstream dependencies before retrying if not all_upstream_completed(block): blocks.put(block) # Re-queue for later continue
Mage implements a safety limit of 1000 retry attempts to prevent infinite loops. If a block reaches this limit, the entire pipeline execution fails. This typically indicates a configuration error or circular dependency.
from mage_ai.data_preparation.decorators import data_loaderimport logging@data_loaderdef load_data(*args, **kwargs): logger = kwargs.get('logger', logging.getLogger(__name__)) try: # Primary data source data = fetch_from_primary_api() logger.info(f"Loaded {len(data)} records from primary source") return data except ConnectionError as e: # Fallback to secondary source logger.warning(f"Primary source failed: {e}") logger.info("Attempting fallback source") try: data = fetch_from_secondary_api() logger.info(f"Loaded {len(data)} records from fallback") return data except Exception as fallback_error: logger.error(f"Fallback also failed: {fallback_error}") raise except ValueError as e: # Handle data validation errors logger.error(f"Data validation failed: {e}") # Return empty result or raise depending on requirements raise except Exception as e: # Catch-all for unexpected errors logger.exception(f"Unexpected error in load_data: {e}") raise
Always include the logger from kwargs and use appropriate log levels:
logging_config: level: INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL # Destination configuration destination_config: type: s3 # s3, gcs, local path: logs/ # Log file path # Format configuration format: json # json or text # Pipeline-specific logging pipeline_logs_retention_days: 30
Debug-level logging can generate large volumes of log data and impact performance. Use sparingly in production and consider pipeline-specific log retention policies.
Use sensors to wait for conditions before proceeding:
from mage_ai.data_preparation.decorators import sensorimport time@sensordef check_file_exists(*args, **kwargs): """Wait for file to appear before proceeding.""" logger = kwargs.get('logger') config = kwargs.get('configuration', {}) file_path = config.get('file_path') max_wait = config.get('max_wait_seconds', 3600) # 1 hour default check_interval = config.get('check_interval', 60) # 1 minute start_time = time.time() while time.time() - start_time < max_wait: if os.path.exists(file_path): logger.info(f"File {file_path} found") return True elapsed = time.time() - start_time logger.info( f"Waiting for {file_path}... " f"({elapsed:.0f}s elapsed, {max_wait - elapsed:.0f}s remaining)" ) time.sleep(check_interval) # Timeout reached logger.error(f"Timeout waiting for {file_path} after {max_wait}s") return False
Sensor blocks should return True to continue pipeline execution or False to fail. Set reasonable timeouts to prevent pipelines from waiting indefinitely.
from mage_ai.data_preparation.decorators import callback@callbackdef on_failure(*args, **kwargs): """Alert on-call when pipeline fails.""" logger = kwargs.get('logger') pipeline_uuid = kwargs.get('pipeline_uuid') block_uuid = kwargs.get('block_uuid', 'unknown') try: # Get failure details from parent block error_message = kwargs.get('error_message', 'Unknown error') alert_oncall( severity='high', title=f'Pipeline {pipeline_uuid} failed', description=f'Block {block_uuid} failed: {error_message}', tags=['pipeline', 'failure', pipeline_uuid], ) # Log to monitoring system send_metric( 'pipeline.failure', tags={'pipeline': pipeline_uuid, 'block': block_uuid} ) logger.info("Failure alert sent") except Exception as e: logger.error(f"Failed to send alert: {e}") # Continue to allow pipeline to fail gracefully
Keep callback blocks lightweight and fault-tolerant. A failing callback should not prevent the pipeline from completing its error handling. Always wrap callback logic in try-except.
@conditiondef check_business_rules(data, *args, **kwargs): """Validate business logic constraints.""" logger = kwargs.get('logger') failures = [] # Rule 1: Revenue must be positive if (data['revenue'] < 0).any(): failures.append("Negative revenue detected") # Rule 2: Dates must be valid if data['transaction_date'].max() > pd.Timestamp.now(): failures.append("Future dates detected") # Rule 3: Required fields must be present required_fields = ['customer_id', 'amount', 'date'] missing = [f for f in required_fields if f not in data.columns] if missing: failures.append(f"Missing required fields: {missing}") if failures: for failure in failures: logger.error(f"Business rule violation: {failure}") return False logger.info("All business rules passed") return True
Conditional blocks affect all downstream blocks. Return False to skip downstream execution, or raise an exception to fail the pipeline entirely.
# In pipeline metadata.yaml blocks: - uuid: long_running_query type: data_loader timeout: 3600 # 1 hour timeout in seconds
@data_loaderdef load_data(*args, **kwargs): # Block will be terminated if execution exceeds timeout # Ensure cleanup logic handles interruption gracefully try: return expensive_operation() finally: # Cleanup runs even if timeout occurs cleanup_resources()
Timeout interrupts block execution immediately. Ensure your code handles interruption gracefully, especially when dealing with transactions or file handles. Use try-finally blocks for cleanup.
@data_exporterdef export_to_database(data, *args, **kwargs): """Idempotent database export using upsert.""" logger = kwargs.get('logger') config = kwargs.get('configuration', {}) from mage_ai.io.postgres import Postgres postgres = Postgres(**config.get('connection')) # Use UPSERT to make operation idempotent # Re-running will update existing records instead of duplicating postgres.export( data, table_name='target_table', if_exists='replace', # or 'update' with unique constraint unique_conflict_columns=['id'], # For UPDATE behavior ) logger.info(f"Exported {len(data)} records (idempotent)")
Idempotent blocks can be safely retried without side effects. Design exports to use upsert operations, and loaders to handle duplicate fetches gracefully.
from mage_ai.data_preparation.decorators import test@testdef test_handles_empty_input(output, *args): """Verify block handles edge cases.""" assert output is not None, 'Should return empty DataFrame, not None' assert len(output) >= 0, 'Output should be valid DataFrame'@testdef test_validates_schema(output, *args): """Verify output has required columns.""" required = ['id', 'timestamp', 'value'] missing = [col for col in required if col not in output.columns] assert not missing, f'Missing required columns: {missing}'@testdef test_data_quality(output, *args): """Verify data quality constraints.""" assert output['id'].notna().all(), 'ID column has nulls' assert (output['value'] >= 0).all(), 'Negative values detected'