Overview
Mage’s integration framework is based on the Singer specification with additional features:- Schema Discovery - Automatically detect available data and schemas
- Incremental Syncing - Track state and sync only new/changed data
- Type Conversion - Handle data type mapping between systems
- Error Handling - Robust retry and failure management
Building a Custom Source
Extend theSource base class to create a custom data source connector.
Basic Structure
from typing import Dict, Generator, List
from mage_integrations.sources.base import Source, main
from mage_integrations.sources.catalog import Catalog, CatalogEntry
from singer.schema import Schema
class CustomSource(Source):
def discover(self, streams: List[str] = None) -> Catalog:
"""
Discover available streams and their schemas.
Called during pipeline setup to detect available data.
"""
pass
def load_data(
self,
stream,
bookmarks: Dict = None,
query: Dict = None,
**kwargs,
) -> Generator[List[Dict], None, None]:
"""
Extract data from the source.
Yields batches of records as dictionaries.
"""
pass
def test_connection(self) -> None:
"""
Verify connection credentials.
Raises exception if connection fails.
"""
pass
if __name__ == '__main__':
main(CustomSource)
Example: Custom API Source
Here’s a complete example for a REST API source:import requests
from typing import Dict, Generator, List
from mage_integrations.sources.base import Source, main
from mage_integrations.sources.catalog import Catalog, CatalogEntry
from mage_integrations.sources.constants import (
COLUMN_TYPE_INTEGER,
COLUMN_TYPE_STRING,
REPLICATION_METHOD_INCREMENTAL,
)
from mage_integrations.sources.utils import get_standard_metadata
from singer.schema import Schema
class CustomAPISource(Source):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.base_url = self.config.get('base_url', 'https://api.example.com')
self.api_key = self.config['api_key']
def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
"""Make authenticated API request."""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
}
response = requests.get(
f'{self.base_url}/{endpoint}',
headers=headers,
params=params,
)
response.raise_for_status()
return response.json()
def discover(self, streams: List[str] = None) -> Catalog:
"""Discover available streams."""
catalog_entries = []
# Define schema for 'users' stream
users_schema = Schema.from_dict({
'type': 'object',
'properties': {
'id': {'type': ['null', 'integer']},
'email': {'type': ['null', 'string']},
'name': {'type': ['null', 'string']},
'created_at': {
'type': ['null', 'string'],
'format': 'date-time',
},
'updated_at': {
'type': ['null', 'string'],
'format': 'date-time',
},
},
})
metadata = get_standard_metadata(
key_properties=['id'],
replication_method=REPLICATION_METHOD_INCREMENTAL,
schema=users_schema.to_dict(),
stream_id='users',
valid_replication_keys=['updated_at'],
)
catalog_entries.append(CatalogEntry(
key_properties=['id'],
metadata=metadata,
replication_method=REPLICATION_METHOD_INCREMENTAL,
schema=users_schema,
stream='users',
tap_stream_id='users',
))
return Catalog(catalog_entries)
def load_data(
self,
stream,
bookmarks: Dict = None,
query: Dict = None,
**kwargs,
) -> Generator[List[Dict], None, None]:
"""Load data from API with pagination."""
stream_id = stream.tap_stream_id
bookmark_value = None
if bookmarks:
bookmark_value = bookmarks.get('updated_at')
page = 1
per_page = 100
has_more = True
while has_more:
params = {
'page': page,
'per_page': per_page,
}
if bookmark_value:
params['updated_since'] = bookmark_value
self.logger.info(
f'Fetching {stream_id} page {page}',
tags=dict(page=page, stream=stream_id),
)
response = self._make_request(stream_id, params=params)
records = response.get('data', [])
if records:
yield records
# Check pagination
has_more = len(records) == per_page
page += 1
def test_connection(self) -> None:
"""Test API connection."""
try:
self._make_request('health')
except Exception as e:
raise Exception(f'Connection test failed: {str(e)}')
if __name__ == '__main__':
main(CustomAPISource)
Testing Your Source
from your_module import CustomAPISource
# Initialize source
source = CustomAPISource(
config={
'api_key': 'your-api-key',
'base_url': 'https://api.example.com',
}
)
# Test connection
try:
source.test_connection()
print('✓ Connection successful')
except Exception as e:
print(f'✗ Connection failed: {e}')
# Discover streams
catalog = source.discover()
for stream in catalog.streams:
print(f'Stream: {stream.stream}')
print(f' Schema: {stream.schema.to_dict()}')
# Load data
for stream in catalog.streams:
if stream.stream == 'users':
for batch in source.load_data(stream):
print(f'Loaded {len(batch)} records')
Building a Custom Destination
Extend theDestination base class to create a custom data destination.
Basic Structure
from typing import Dict, List
from mage_integrations.destinations.base import Destination, main
class CustomDestination(Destination):
def build_connection(self):
"""
Create connection to destination.
Called once at initialization.
"""
pass
def build_create_table_commands(
self,
schema: Dict,
schema_name: str,
stream: str,
table_name: str,
database_name: str = None,
unique_constraints: List[str] = None,
) -> List[str]:
"""
Generate DDL commands to create table.
"""
pass
def build_alter_table_commands(
self,
schema: Dict,
schema_name: str,
stream: str,
table_name: str,
database_name: str = None,
unique_constraints: List[str] = None,
) -> List[str]:
"""
Generate DDL to add new columns.
"""
pass
def process_queries(
self,
query_strings: List[str],
record_data: List[Dict],
stream: str,
tags: Dict = None,
**kwargs,
) -> List[List]:
"""
Execute queries to load data.
"""
pass
if __name__ == '__main__':
main(CustomDestination)
Example: Custom Database Destination
import psycopg2
from typing import Dict, List, Tuple
from mage_integrations.destinations.sql.base import Destination, main
from mage_integrations.destinations.sql.utils import (
build_alter_table_command,
build_create_table_command,
build_insert_command,
column_type_mapping,
)
class CustomDatabaseDestination(Destination):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.connection = None
def build_connection(self):
"""Create database connection."""
if self.connection:
return self.connection
self.connection = psycopg2.connect(
host=self.config['host'],
port=self.config.get('port', 5432),
database=self.config['database'],
user=self.config['username'],
password=self.config['password'],
)
return self.connection
def build_create_table_commands(
self,
schema: Dict,
schema_name: str,
stream: str,
table_name: str,
database_name: str = None,
unique_constraints: List[str] = None,
) -> List[str]:
"""Generate CREATE TABLE statement."""
type_mapping = column_type_mapping(
schema,
convert_column_type=lambda column_type, column_format: {
'integer': 'BIGINT',
'number': 'DOUBLE PRECISION',
'string': 'TEXT',
'boolean': 'BOOLEAN',
'object': 'JSONB',
'array': 'JSONB',
}.get(column_type, 'TEXT'),
convert_column_type_array=lambda item_type: 'JSONB',
)
return [
build_create_table_command(
column_type_mapping=type_mapping,
columns=schema['properties'].keys(),
full_table_name=f'{schema_name}.{table_name}',
unique_constraints=unique_constraints,
column_identifier='"',
)
]
def build_alter_table_commands(
self,
schema: Dict,
schema_name: str,
stream: str,
table_name: str,
database_name: str = None,
unique_constraints: List[str] = None,
) -> List[str]:
"""Generate ALTER TABLE to add columns."""
conn = self.build_connection()
cursor = conn.cursor()
# Get existing columns
cursor.execute(f"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
""", (schema_name, table_name))
existing_columns = {row[0] for row in cursor.fetchall()}
new_columns = [
col for col in schema['properties'].keys()
if col not in existing_columns
]
if not new_columns:
return []
type_mapping = column_type_mapping(
schema,
convert_column_type=lambda column_type, column_format: {
'integer': 'BIGINT',
'number': 'DOUBLE PRECISION',
'string': 'TEXT',
}.get(column_type, 'TEXT'),
)
return [
build_alter_table_command(
column_type_mapping=type_mapping,
columns=new_columns,
full_table_name=f'{schema_name}.{table_name}',
column_identifier='"',
)
]
def process_queries(
self,
query_strings: List[str],
record_data: List[Dict],
stream: str,
tags: Dict = None,
**kwargs,
) -> List[List[Tuple]]:
"""Execute SQL queries."""
conn = self.build_connection()
cursor = conn.cursor()
results = []
try:
# Execute DDL statements
for query in query_strings:
cursor.execute(query)
# Insert data
if record_data:
schema = self.schemas[stream]
columns = list(schema['properties'].keys())
insert_columns, insert_values = build_insert_command(
column_type_mapping=column_type_mapping(schema),
columns=columns,
records=[d['record'] for d in record_data],
)
for values in insert_values:
placeholders = ','.join(['%s'] * len(values))
query = f"""
INSERT INTO {self.config['schema']}.{self.config['table']}
({','.join(insert_columns)})
VALUES ({placeholders})
"""
cursor.execute(query, values)
conn.commit()
results.append([(len(record_data),)])
except Exception as e:
conn.rollback()
raise e
return results
if __name__ == '__main__':
main(CustomDatabaseDestination)
Advanced Features
Incremental Syncing with Bookmarks
Track sync progress to avoid reprocessing data:def load_data(self, stream, bookmarks: Dict = None, **kwargs):
# Get last synced value
last_updated = bookmarks.get('updated_at') if bookmarks else None
# Fetch only new records
params = {}
if last_updated:
params['updated_since'] = last_updated
for batch in self.fetch_data(params):
# Update bookmark with latest value
if batch:
latest = max(record['updated_at'] for record in batch)
bookmarks['updated_at'] = latest
yield batch
Schema Evolution
Handle schema changes automatically:def build_alter_table_commands(self, schema, schema_name, stream, table_name, **kwargs):
# Detect new columns
existing_columns = self.get_existing_columns(schema_name, table_name)
new_columns = [
col for col in schema['properties'].keys()
if col not in existing_columns
]
# Detect type changes
changed_columns = self.detect_type_changes(schema, schema_name, table_name)
# Generate ALTER statements
alter_commands = []
if new_columns:
alter_commands.append(self.add_columns_command(new_columns))
if changed_columns:
alter_commands.append(self.modify_columns_command(changed_columns))
return alter_commands
Error Handling & Retries
import time
from functools import wraps
def retry_on_failure(max_retries=3, delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(delay * (2 ** attempt))
continue
return wrapper
return decorator
class RobustSource(Source):
@retry_on_failure(max_retries=3)
def _make_request(self, endpoint):
return requests.get(endpoint)
Custom Type Conversions
def convert_column_type(column_type: str, column_format: str = None) -> str:
"""Map source types to destination types."""
type_map = {
'integer': 'BIGINT',
'number': 'DECIMAL(18,6)',
'string': 'VARCHAR(65535)',
'boolean': 'BOOLEAN',
'object': 'SUPER', # Redshift SUPER type
'array': 'SUPER',
}
if column_format == 'date-time':
return 'TIMESTAMP'
elif column_format == 'date':
return 'DATE'
return type_map.get(column_type, 'VARCHAR(65535)')
Configuration Templates
Create a configuration template for your integration:{
"api_key": "your-api-key",
"base_url": "https://api.example.com",
"timeout": 30,
"retry_attempts": 3,
"start_date": "2024-01-01T00:00:00Z"
}
Best Practices
Logging
Logging
Use Mage’s logger for consistent logging:
self.logger.info(
'Fetching data from API',
tags=dict(
endpoint='users',
page=page,
total_records=total,
)
)
Pagination
Pagination
Handle large datasets with pagination:
page = 1
while True:
response = self.fetch_page(page)
if not response['data']:
break
yield response['data']
page += 1
Rate Limiting
Rate Limiting
Respect API rate limits:
import time
from datetime import datetime
class RateLimitedSource(Source):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.requests_per_second = 10
self.last_request_time = None
def _wait_if_needed(self):
if self.last_request_time:
elapsed = (datetime.now() - self.last_request_time).total_seconds()
wait_time = (1.0 / self.requests_per_second) - elapsed
if wait_time > 0:
time.sleep(wait_time)
self.last_request_time = datetime.now()
Testing
Testing
Write unit tests for your connector:
import unittest
from unittest.mock import Mock, patch
class TestCustomSource(unittest.TestCase):
def setUp(self):
self.source = CustomSource(
config={'api_key': 'test-key'}
)
def test_discover(self):
catalog = self.source.discover()
self.assertGreater(len(catalog.streams), 0)
@patch('requests.get')
def test_load_data(self, mock_get):
mock_get.return_value.json.return_value = {
'data': [{'id': 1, 'name': 'Test'}]
}
# Test data loading
Packaging Your Integration
Create a proper Python package:my_integration/
├── __init__.py
├── sources/
│ ├── __init__.py
│ └── my_source/
│ ├── __init__.py
│ └── templates/
│ └── config.json
├── destinations/
│ ├── __init__.py
│ └── my_destination/
│ └── __init__.py
├── setup.py
└── README.md
from setuptools import setup, find_packages
setup(
name='mage-my-integration',
version='0.1.0',
packages=find_packages(),
install_requires=[
'mage-ai',
'requests',
],
)
Next Steps
Source Reference
See built-in sources for examples
Destination Reference
Explore destination implementations