Skip to main content

Overview

This example demonstrates how to build a production-ready SQL metadata extraction workflow. It extracts metadata from a PostgreSQL database, including:
  • Databases
  • Schemas
  • Tables and views
  • Columns and their types
The workflow uses the Temporal workflow engine to orchestrate the extraction process, making it reliable, scalable, and resumable.

What You’ll Build

A metadata extraction application that:
  1. Connects to a PostgreSQL database
  2. Performs preflight checks to validate the connection
  3. Extracts metadata using custom SQL queries
  4. Transforms the raw metadata into Atlan entities
  5. Stores the results for ingestion into Atlan
While this example uses PostgreSQL, the same pattern can be adapted for any SQL database (MySQL, Snowflake, BigQuery, etc.) by customizing the SQL queries and connection configuration.

Complete Code

application_sql.py
"""
This example demonstrates how to create a SQL workflow for extracting metadata from a PostgreSQL database.
It uses the Temporal workflow engine to manage the extraction process.

Key components:
- SampleSQLWorkflowMetadata: Defines metadata extraction queries
- SampleSQLWorkflowPreflight: Performs preflight checks
- SampleSQLWorkflowWorker: Implements the main workflow logic (including extraction and transformation)
- SampleSQLWorkflowBuilder: Configures and builds the workflow

Workflow steps:
1. Perform preflight checks
2. Create an output directory
3. Fetch database information
4. Fetch schema information
5. Fetch table information
6. Fetch column information
7. Transform the metadata into Atlas entities
8. Clean up the output directory
9. Push results to object store

Usage:
1. Set the PostgreSQL connection credentials as environment variables
2. Run the script to start the Temporal worker and execute the workflow

Note: This example is specific to PostgreSQL but can be adapted for other SQL databases.
"""

import asyncio
import os
import time
from typing import Any, Dict

from application_sdk.activities.metadata_extraction.sql import (
    BaseSQLMetadataExtractionActivities,
)
from application_sdk.application.metadata_extraction.sql import (
    BaseSQLMetadataExtractionApplication,
)
from application_sdk.clients.models import DatabaseConfig
from application_sdk.clients.sql import BaseSQLClient
from application_sdk.handlers.sql import BaseSQLHandler
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.workflows.metadata_extraction.sql import (
    BaseSQLMetadataExtractionWorkflow,
)

APPLICATION_NAME = "postgres"

logger = get_logger(__name__)


class SQLClient(BaseSQLClient):
    DB_CONFIG = DatabaseConfig(
        template="postgresql+psycopg://{username}:{password}@{host}:{port}/{database}",
        required=["username", "password", "host", "port", "database"],
    )


class SampleSQLActivities(BaseSQLMetadataExtractionActivities):
    fetch_database_sql = """
    SELECT datname as database_name FROM pg_database WHERE datname = current_database();
    """

    fetch_schema_sql = """
    SELECT
        s.*
    FROM
        information_schema.schemata s
    WHERE
        s.schema_name NOT LIKE 'pg_%'
        AND s.schema_name != 'information_schema'
        AND concat(s.CATALOG_NAME, concat('.', s.SCHEMA_NAME)) !~ '{normalized_exclude_regex}'
        AND concat(s.CATALOG_NAME, concat('.', s.SCHEMA_NAME)) ~ '{normalized_include_regex}';
    """

    fetch_table_sql = """
    SELECT
        t.*
    FROM
        information_schema.tables t
    WHERE concat(current_database(), concat('.', t.table_schema)) !~ '{normalized_exclude_regex}'
        AND concat(current_database(), concat('.', t.table_schema)) ~ '{normalized_include_regex}'
        {temp_table_regex_sql};
    """

    extract_temp_table_regex_table_sql = "AND t.table_name !~ '{exclude_table_regex}'"
    extract_temp_table_regex_column_sql = "AND c.table_name !~ '{exclude_table_regex}'"

    fetch_column_sql = """
    SELECT
        c.*
    FROM
        information_schema.columns c
    WHERE
        concat(current_database(), concat('.', c.table_schema)) !~ '{normalized_exclude_regex}'
        AND concat(current_database(), concat('.', c.table_schema)) ~ '{normalized_include_regex}'
        {temp_table_regex_sql};
    """


class SampleSQLWorkflowHandler(BaseSQLHandler):
    tables_check_sql = """
    SELECT count(*)
        FROM INFORMATION_SCHEMA.TABLES
        WHERE concat(TABLE_CATALOG, concat('.', TABLE_SCHEMA)) !~ '{normalized_exclude_regex}'
            AND concat(TABLE_CATALOG, concat('.', TABLE_SCHEMA)) ~ '{normalized_include_regex}'
            AND TABLE_SCHEMA NOT IN ('performance_schema', 'information_schema', 'pg_catalog', 'pg_internal')
            {temp_table_regex_sql};
    """

    temp_table_regex_sql = "AND t.table_name !~ '{exclude_table_regex}'"

    metadata_sql = """
    SELECT schema_name, catalog_name
        FROM INFORMATION_SCHEMA.SCHEMATA
        WHERE schema_name NOT LIKE 'pg_%' AND schema_name != 'information_schema'
    """


async def application_sql(daemon: bool = True) -> Dict[str, Any]:
    logger.info("Starting application_sql")

    app = BaseSQLMetadataExtractionApplication(
        name=APPLICATION_NAME,
        client_class=SQLClient,
        handler_class=SampleSQLWorkflowHandler,
    )

    await app.setup_workflow(
        workflow_and_activities_classes=[
            (BaseSQLMetadataExtractionWorkflow, SampleSQLActivities)
        ]
    )

    time.sleep(3)

    workflow_args = {
        "credentials": {
            "authType": "basic",
            "host": os.getenv("POSTGRES_HOST", "localhost"),
            "port": os.getenv("POSTGRES_PORT", "5432"),
            "username": os.getenv("POSTGRES_USER", "postgres"),
            "password": os.getenv("POSTGRES_PASSWORD", "password"),
            "database": os.getenv("POSTGRES_DATABASE", "postgres"),
        },
        "connection": {
            "connection_name": "test-connection",
            "connection_qualified_name": "default/postgres/1728518400",
        },
        "metadata": {
            "exclude-filter": "{}",
            "include-filter": "{}",
            "temp-table-regex": "",
            "extraction-method": "direct",
            "exclude_views": "true",
            "exclude_empty_tables": "false",
        },
        "tenant_id": "123",
        # "workflow_id": "27498f69-13ae-44ec-a2dc-13ff81c517de",  # if you want to rerun an existing workflow, just keep this field.
        # "cron_schedule": "0/30 * * * *", # uncomment to run the workflow on a cron schedule, every 30 minutes
    }

    workflow_response = await app.start_workflow(workflow_args=workflow_args)

    await app.start_worker(daemon=daemon)

    return workflow_response


if __name__ == "__main__":
    asyncio.run(application_sql(daemon=False))

How to Run

1

Start Dependencies

Start the Dapr runtime and Temporal server:
uv run poe start-deps
2

Configure PostgreSQL Connection

Set the PostgreSQL connection credentials as environment variables:
export POSTGRES_HOST=localhost
export POSTGRES_PORT=5432
export POSTGRES_USER=postgres
export POSTGRES_PASSWORD=password
export POSTGRES_DATABASE=postgres
The example uses default credentials for local development. Never use these in production. Always use secure environment variables and secrets management.
3

Run the Example

Execute the SQL application:
uv run examples/application_sql.py
4

Monitor Progress

The workflow will:
  • Connect to PostgreSQL
  • Validate the connection
  • Extract metadata from all accessible schemas
  • Transform the data into Atlan entities
  • Store the results

Key Components Explained

SQL Client Configuration

class SQLClient(BaseSQLClient):
    DB_CONFIG = DatabaseConfig(
        template="postgresql+psycopg://{username}:{password}@{host}:{port}/{database}",
        required=["username", "password", "host", "port", "database"],
    )
The SQLClient defines the connection string template and required credentials. The SDK uses SQLAlchemy under the hood, so you can use any SQLAlchemy-compatible connection string.

Custom Extraction Queries

class SampleSQLActivities(BaseSQLMetadataExtractionActivities):
    fetch_database_sql = """
    SELECT datname as database_name FROM pg_database WHERE datname = current_database();
    """
    # ... more queries
Define custom SQL queries for each metadata type. The queries support:
  • Include/exclude filters: {normalized_include_regex} and {normalized_exclude_regex} placeholders
  • Temporary table filtering: {temp_table_regex_sql} placeholder
  • Database-specific schema: Customize queries for your database dialect

Preflight Checks

class SampleSQLWorkflowHandler(BaseSQLHandler):
    tables_check_sql = """
    SELECT count(*) FROM INFORMATION_SCHEMA.TABLES WHERE ...
    """
The handler implements preflight checks that validate:
  • Connection credentials work
  • Required permissions are available
  • Tables exist in the specified schemas

Application Setup

app = BaseSQLMetadataExtractionApplication(
    name=APPLICATION_NAME,
    client_class=SQLClient,
    handler_class=SampleSQLWorkflowHandler,
)
The BaseSQLMetadataExtractionApplication provides:
  • Pre-built workflow for SQL metadata extraction
  • Built-in transformation to Atlan entities
  • Error handling and retry logic
  • Progress tracking and heartbeating

Workflow Arguments

workflow_args = {
    "credentials": { ... },      # Database connection details
    "connection": { ... },       # Atlan connection metadata
    "metadata": { ... },         # Extraction configuration
    "tenant_id": "123",         # Atlan tenant ID
}
The workflow accepts:
  • credentials: Database authentication details
  • connection: Atlan connection information
  • metadata: Filters and extraction options
  • tenant_id: Your Atlan tenant identifier
You can rerun an existing workflow by including the workflow_id field in the workflow arguments. This is useful for debugging or resuming failed workflows.

Advanced Features

Scheduled Execution

Run the workflow on a schedule using cron syntax:
workflow_args = {
    # ... other args
    "cron_schedule": "0/30 * * * *",  # Every 30 minutes
}

Include/Exclude Filters

Control which schemas and tables to extract:
"metadata": {
    "include-filter": '{"database1": ["schema1", "schema2"]}',
    "exclude-filter": '{"database1": ["temp_schema"]}',
    "temp-table-regex": "^tmp_.*",  # Exclude tables starting with tmp_
}

Extraction Options

"metadata": {
    "extraction-method": "direct",        # or "query-based"
    "exclude_views": "true",             # Skip views
    "exclude_empty_tables": "false",     # Include empty tables
}

Adapting for Other Databases

To adapt this example for MySQL, Snowflake, or other databases:
  1. Update the connection template:
    DB_CONFIG = DatabaseConfig(
        template="mysql+pymysql://{username}:{password}@{host}:{port}/{database}",
        required=["username", "password", "host", "port", "database"],
    )
    
  2. Customize the SQL queries to use the appropriate system tables/views for your database
  3. Update the handler queries for preflight checks specific to your database

Next Steps

Build docs developers (and LLMs) love