Skip to main content
Create custom data integrations by extending Mage’s base source and destination classes. This guide shows you how to build connectors for any API, database, or data source.

Overview

Mage’s integration framework is based on the Singer specification with additional features:
  • Schema Discovery - Automatically detect available data and schemas
  • Incremental Syncing - Track state and sync only new/changed data
  • Type Conversion - Handle data type mapping between systems
  • Error Handling - Robust retry and failure management

Building a Custom Source

Extend the Source base class to create a custom data source connector.

Basic Structure

from typing import Dict, Generator, List
from mage_integrations.sources.base import Source, main
from mage_integrations.sources.catalog import Catalog, CatalogEntry
from singer.schema import Schema

class CustomSource(Source):
    def discover(self, streams: List[str] = None) -> Catalog:
        """
        Discover available streams and their schemas.
        Called during pipeline setup to detect available data.
        """
        pass

    def load_data(
        self,
        stream,
        bookmarks: Dict = None,
        query: Dict = None,
        **kwargs,
    ) -> Generator[List[Dict], None, None]:
        """
        Extract data from the source.
        Yields batches of records as dictionaries.
        """
        pass

    def test_connection(self) -> None:
        """
        Verify connection credentials.
        Raises exception if connection fails.
        """
        pass

if __name__ == '__main__':
    main(CustomSource)

Example: Custom API Source

Here’s a complete example for a REST API source:
import requests
from typing import Dict, Generator, List
from mage_integrations.sources.base import Source, main
from mage_integrations.sources.catalog import Catalog, CatalogEntry
from mage_integrations.sources.constants import (
    COLUMN_TYPE_INTEGER,
    COLUMN_TYPE_STRING,
    REPLICATION_METHOD_INCREMENTAL,
)
from mage_integrations.sources.utils import get_standard_metadata
from singer.schema import Schema

class CustomAPISource(Source):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.base_url = self.config.get('base_url', 'https://api.example.com')
        self.api_key = self.config['api_key']

    def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
        """Make authenticated API request."""
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json',
        }
        response = requests.get(
            f'{self.base_url}/{endpoint}',
            headers=headers,
            params=params,
        )
        response.raise_for_status()
        return response.json()

    def discover(self, streams: List[str] = None) -> Catalog:
        """Discover available streams."""
        catalog_entries = []

        # Define schema for 'users' stream
        users_schema = Schema.from_dict({
            'type': 'object',
            'properties': {
                'id': {'type': ['null', 'integer']},
                'email': {'type': ['null', 'string']},
                'name': {'type': ['null', 'string']},
                'created_at': {
                    'type': ['null', 'string'],
                    'format': 'date-time',
                },
                'updated_at': {
                    'type': ['null', 'string'],
                    'format': 'date-time',
                },
            },
        })

        metadata = get_standard_metadata(
            key_properties=['id'],
            replication_method=REPLICATION_METHOD_INCREMENTAL,
            schema=users_schema.to_dict(),
            stream_id='users',
            valid_replication_keys=['updated_at'],
        )

        catalog_entries.append(CatalogEntry(
            key_properties=['id'],
            metadata=metadata,
            replication_method=REPLICATION_METHOD_INCREMENTAL,
            schema=users_schema,
            stream='users',
            tap_stream_id='users',
        ))

        return Catalog(catalog_entries)

    def load_data(
        self,
        stream,
        bookmarks: Dict = None,
        query: Dict = None,
        **kwargs,
    ) -> Generator[List[Dict], None, None]:
        """Load data from API with pagination."""
        stream_id = stream.tap_stream_id
        bookmark_value = None

        if bookmarks:
            bookmark_value = bookmarks.get('updated_at')

        page = 1
        per_page = 100
        has_more = True

        while has_more:
            params = {
                'page': page,
                'per_page': per_page,
            }

            if bookmark_value:
                params['updated_since'] = bookmark_value

            self.logger.info(
                f'Fetching {stream_id} page {page}',
                tags=dict(page=page, stream=stream_id),
            )

            response = self._make_request(stream_id, params=params)
            records = response.get('data', [])

            if records:
                yield records

            # Check pagination
            has_more = len(records) == per_page
            page += 1

    def test_connection(self) -> None:
        """Test API connection."""
        try:
            self._make_request('health')
        except Exception as e:
            raise Exception(f'Connection test failed: {str(e)}')

if __name__ == '__main__':
    main(CustomAPISource)

Testing Your Source

from your_module import CustomAPISource

# Initialize source
source = CustomAPISource(
    config={
        'api_key': 'your-api-key',
        'base_url': 'https://api.example.com',
    }
)

# Test connection
try:
    source.test_connection()
    print('✓ Connection successful')
except Exception as e:
    print(f'✗ Connection failed: {e}')

# Discover streams
catalog = source.discover()
for stream in catalog.streams:
    print(f'Stream: {stream.stream}')
    print(f'  Schema: {stream.schema.to_dict()}')

# Load data
for stream in catalog.streams:
    if stream.stream == 'users':
        for batch in source.load_data(stream):
            print(f'Loaded {len(batch)} records')

Building a Custom Destination

Extend the Destination base class to create a custom data destination.

Basic Structure

from typing import Dict, List
from mage_integrations.destinations.base import Destination, main

class CustomDestination(Destination):
    def build_connection(self):
        """
        Create connection to destination.
        Called once at initialization.
        """
        pass

    def build_create_table_commands(
        self,
        schema: Dict,
        schema_name: str,
        stream: str,
        table_name: str,
        database_name: str = None,
        unique_constraints: List[str] = None,
    ) -> List[str]:
        """
        Generate DDL commands to create table.
        """
        pass

    def build_alter_table_commands(
        self,
        schema: Dict,
        schema_name: str,
        stream: str,
        table_name: str,
        database_name: str = None,
        unique_constraints: List[str] = None,
    ) -> List[str]:
        """
        Generate DDL to add new columns.
        """
        pass

    def process_queries(
        self,
        query_strings: List[str],
        record_data: List[Dict],
        stream: str,
        tags: Dict = None,
        **kwargs,
    ) -> List[List]:
        """
        Execute queries to load data.
        """
        pass

if __name__ == '__main__':
    main(CustomDestination)

Example: Custom Database Destination

import psycopg2
from typing import Dict, List, Tuple
from mage_integrations.destinations.sql.base import Destination, main
from mage_integrations.destinations.sql.utils import (
    build_alter_table_command,
    build_create_table_command,
    build_insert_command,
    column_type_mapping,
)

class CustomDatabaseDestination(Destination):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.connection = None

    def build_connection(self):
        """Create database connection."""
        if self.connection:
            return self.connection

        self.connection = psycopg2.connect(
            host=self.config['host'],
            port=self.config.get('port', 5432),
            database=self.config['database'],
            user=self.config['username'],
            password=self.config['password'],
        )
        return self.connection

    def build_create_table_commands(
        self,
        schema: Dict,
        schema_name: str,
        stream: str,
        table_name: str,
        database_name: str = None,
        unique_constraints: List[str] = None,
    ) -> List[str]:
        """Generate CREATE TABLE statement."""
        type_mapping = column_type_mapping(
            schema,
            convert_column_type=lambda column_type, column_format: {
                'integer': 'BIGINT',
                'number': 'DOUBLE PRECISION',
                'string': 'TEXT',
                'boolean': 'BOOLEAN',
                'object': 'JSONB',
                'array': 'JSONB',
            }.get(column_type, 'TEXT'),
            convert_column_type_array=lambda item_type: 'JSONB',
        )

        return [
            build_create_table_command(
                column_type_mapping=type_mapping,
                columns=schema['properties'].keys(),
                full_table_name=f'{schema_name}.{table_name}',
                unique_constraints=unique_constraints,
                column_identifier='"',
            )
        ]

    def build_alter_table_commands(
        self,
        schema: Dict,
        schema_name: str,
        stream: str,
        table_name: str,
        database_name: str = None,
        unique_constraints: List[str] = None,
    ) -> List[str]:
        """Generate ALTER TABLE to add columns."""
        conn = self.build_connection()
        cursor = conn.cursor()

        # Get existing columns
        cursor.execute(f"""
            SELECT column_name
            FROM information_schema.columns
            WHERE table_schema = %s AND table_name = %s
        """, (schema_name, table_name))

        existing_columns = {row[0] for row in cursor.fetchall()}
        new_columns = [
            col for col in schema['properties'].keys()
            if col not in existing_columns
        ]

        if not new_columns:
            return []

        type_mapping = column_type_mapping(
            schema,
            convert_column_type=lambda column_type, column_format: {
                'integer': 'BIGINT',
                'number': 'DOUBLE PRECISION',
                'string': 'TEXT',
            }.get(column_type, 'TEXT'),
        )

        return [
            build_alter_table_command(
                column_type_mapping=type_mapping,
                columns=new_columns,
                full_table_name=f'{schema_name}.{table_name}',
                column_identifier='"',
            )
        ]

    def process_queries(
        self,
        query_strings: List[str],
        record_data: List[Dict],
        stream: str,
        tags: Dict = None,
        **kwargs,
    ) -> List[List[Tuple]]:
        """Execute SQL queries."""
        conn = self.build_connection()
        cursor = conn.cursor()
        results = []

        try:
            # Execute DDL statements
            for query in query_strings:
                cursor.execute(query)

            # Insert data
            if record_data:
                schema = self.schemas[stream]
                columns = list(schema['properties'].keys())

                insert_columns, insert_values = build_insert_command(
                    column_type_mapping=column_type_mapping(schema),
                    columns=columns,
                    records=[d['record'] for d in record_data],
                )

                for values in insert_values:
                    placeholders = ','.join(['%s'] * len(values))
                    query = f"""
                        INSERT INTO {self.config['schema']}.{self.config['table']}
                        ({','.join(insert_columns)})
                        VALUES ({placeholders})
                    """
                    cursor.execute(query, values)

            conn.commit()
            results.append([(len(record_data),)])

        except Exception as e:
            conn.rollback()
            raise e

        return results

if __name__ == '__main__':
    main(CustomDatabaseDestination)

Advanced Features

Incremental Syncing with Bookmarks

Track sync progress to avoid reprocessing data:
def load_data(self, stream, bookmarks: Dict = None, **kwargs):
    # Get last synced value
    last_updated = bookmarks.get('updated_at') if bookmarks else None

    # Fetch only new records
    params = {}
    if last_updated:
        params['updated_since'] = last_updated

    for batch in self.fetch_data(params):
        # Update bookmark with latest value
        if batch:
            latest = max(record['updated_at'] for record in batch)
            bookmarks['updated_at'] = latest

        yield batch

Schema Evolution

Handle schema changes automatically:
def build_alter_table_commands(self, schema, schema_name, stream, table_name, **kwargs):
    # Detect new columns
    existing_columns = self.get_existing_columns(schema_name, table_name)
    new_columns = [
        col for col in schema['properties'].keys()
        if col not in existing_columns
    ]

    # Detect type changes
    changed_columns = self.detect_type_changes(schema, schema_name, table_name)

    # Generate ALTER statements
    alter_commands = []
    if new_columns:
        alter_commands.append(self.add_columns_command(new_columns))
    if changed_columns:
        alter_commands.append(self.modify_columns_command(changed_columns))

    return alter_commands

Error Handling & Retries

import time
from functools import wraps

def retry_on_failure(max_retries=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    time.sleep(delay * (2 ** attempt))
                    continue
        return wrapper
    return decorator

class RobustSource(Source):
    @retry_on_failure(max_retries=3)
    def _make_request(self, endpoint):
        return requests.get(endpoint)

Custom Type Conversions

def convert_column_type(column_type: str, column_format: str = None) -> str:
    """Map source types to destination types."""
    type_map = {
        'integer': 'BIGINT',
        'number': 'DECIMAL(18,6)',
        'string': 'VARCHAR(65535)',
        'boolean': 'BOOLEAN',
        'object': 'SUPER',  # Redshift SUPER type
        'array': 'SUPER',
    }

    if column_format == 'date-time':
        return 'TIMESTAMP'
    elif column_format == 'date':
        return 'DATE'

    return type_map.get(column_type, 'VARCHAR(65535)')

Configuration Templates

Create a configuration template for your integration:
{
  "api_key": "your-api-key",
  "base_url": "https://api.example.com",
  "timeout": 30,
  "retry_attempts": 3,
  "start_date": "2024-01-01T00:00:00Z"
}

Best Practices

Use Mage’s logger for consistent logging:
self.logger.info(
    'Fetching data from API',
    tags=dict(
        endpoint='users',
        page=page,
        total_records=total,
    )
)
Respect API rate limits:
import time
from datetime import datetime

class RateLimitedSource(Source):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.requests_per_second = 10
        self.last_request_time = None

    def _wait_if_needed(self):
        if self.last_request_time:
            elapsed = (datetime.now() - self.last_request_time).total_seconds()
            wait_time = (1.0 / self.requests_per_second) - elapsed
            if wait_time > 0:
                time.sleep(wait_time)
        self.last_request_time = datetime.now()
Write unit tests for your connector:
import unittest
from unittest.mock import Mock, patch

class TestCustomSource(unittest.TestCase):
    def setUp(self):
        self.source = CustomSource(
            config={'api_key': 'test-key'}
        )

    def test_discover(self):
        catalog = self.source.discover()
        self.assertGreater(len(catalog.streams), 0)

    @patch('requests.get')
    def test_load_data(self, mock_get):
        mock_get.return_value.json.return_value = {
            'data': [{'id': 1, 'name': 'Test'}]
        }
        # Test data loading

Packaging Your Integration

Create a proper Python package:
my_integration/
├── __init__.py
├── sources/
│   ├── __init__.py
│   └── my_source/
│       ├── __init__.py
│       └── templates/
│           └── config.json
├── destinations/
│   ├── __init__.py
│   └── my_destination/
│       └── __init__.py
├── setup.py
└── README.md
setup.py:
from setuptools import setup, find_packages

setup(
    name='mage-my-integration',
    version='0.1.0',
    packages=find_packages(),
    install_requires=[
        'mage-ai',
        'requests',
    ],
)

Next Steps

Source Reference

See built-in sources for examples

Destination Reference

Explore destination implementations

Build docs developers (and LLMs) love