Skip to main content
The Application SDK provides a robust framework for building SQL applications that interact with databases to extract, process, and store metadata. This guide demonstrates how to leverage the SDK’s default implementations while maintaining the flexibility to customize behavior for your specific needs.

Overview

An SQL application built with the SDK follows a “batteries included but swappable” philosophy - it provides sensible defaults for common tasks while allowing customization at every level.
1

Connect

Securely connects to SQL databases using various authentication methods (basic, IAM user, IAM role).
2

Extract

Fetches metadata (schemas, tables, columns, procedures, etc.) using configurable SQL queries.
3

Validate

Performs preflight checks and validates extracted metadata to ensure data quality.
4

Transform

Converts the extracted metadata into a standardized format (e.g., Atlas entities).
5

Store

Saves the processed metadata to a configured object store for further processing.
6

Orchestrate

Manages the entire process using Temporal workflows for reliability and scalability.

Core Components

The SDK’s SQL metadata extraction workflow relies on three main customizable components:

SQL Client

BaseSQLClientHandles database connectivity and query execution.Extend to support different SQL dialects, connection string formats, or custom authentication logic.

Activities

BaseSQLMetadataExtractionActivitiesDefines SQL queries for extracting metadata (databases, schemas, tables, columns, etc.).Override specific query attributes to tailor metadata extraction for your database.

Handler

BaseSQLHandlerManages database-specific validation logic and helper methods.Extend to implement custom validation checks or database-specific logic.
These components work together within the BaseSQLMetadataExtractionWorkflow, orchestrated by a Worker.

Quick Start: Using Defaults

You can quickly get started by using the default implementations provided by the SDK:
1

Get Workflow Client

Initialize the workflow client that connects to your Temporal instance.
2

Create Activities

Instantiate the default activities, optionally providing custom SQL client or handler classes.
3

Set Up Worker

Create a Worker instance with the workflow class and activities.
4

Configure Workflow

Define workflow arguments with credentials and configuration.
5

Start Execution

Start the workflow and the worker.

Customization Examples

While the defaults are powerful, you’ll often need to customize components for specific databases or requirements.

Custom SQL Client

Extend BaseSQLClient to define connection parameters specific to your database:
from application_sdk.clients.sql import BaseSQLClient
from application_sdk.clients.models import DatabaseConfig

class SQLClient(BaseSQLClient):
    # Define connection string template and required parameters for PostgreSQL
    DB_CONFIG = DatabaseConfig(
        template="postgresql+psycopg://{username}:{password}@{host}:{port}/{database}",
        required=["username", "password", "host", "port", "database"],
    )
The connection string template uses placeholders that will be filled from the credentials provided in workflow_args.

Custom Activities

Extend BaseSQLMetadataExtractionActivities to provide custom SQL queries for metadata extraction:
class SampleSQLActivities(BaseSQLMetadataExtractionActivities):
    # Customize database metadata query for PostgreSQL
    fetch_database_sql = """
    SELECT datname as database_name 
    FROM pg_database 
    WHERE datname = current_database();
    """
The queries use template variables like {normalized_exclude_regex} and {normalized_include_regex} which are filled at runtime from the workflow configuration.

Custom Handler

Extend BaseSQLHandler to implement custom validation logic:
from application_sdk.handlers.sql import BaseSQLHandler

class SampleSQLWorkflowHandler(BaseSQLHandler):
    # Customize query to check table count (used in preflight checks)
    tables_check_sql = """
    SELECT count(*)
    FROM INFORMATION_SCHEMA.TABLES t
    WHERE concat(TABLE_CATALOG, concat('.', TABLE_SCHEMA)) !~ '{normalized_exclude_regex}'
        AND concat(TABLE_CATALOG, concat('.', TABLE_SCHEMA)) ~ '{normalized_include_regex}'
        AND TABLE_SCHEMA NOT IN ('performance_schema', 'information_schema', 'pg_catalog', 'pg_internal')
        {temp_table_regex_sql};
    """
    
    temp_table_regex_sql = "AND t.table_name !~ '{exclude_table_regex}'"
    
    # Customize query to fetch basic schema/catalog info
    metadata_sql = """
    SELECT schema_name, catalog_name
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name NOT LIKE 'pg_%' 
        AND schema_name != 'information_schema'
    """

Complete Application Example

Here’s how to integrate custom components into a complete application:
import asyncio
import os
import time
from typing import Any, Dict

from application_sdk.activities.metadata_extraction.sql import (
    BaseSQLMetadataExtractionActivities,
)
from application_sdk.application.metadata_extraction.sql import (
    BaseSQLMetadataExtractionApplication,
)
from application_sdk.clients.models import DatabaseConfig
from application_sdk.clients.sql import BaseSQLClient
from application_sdk.handlers.sql import BaseSQLHandler
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.workflows.metadata_extraction.sql import (
    BaseSQLMetadataExtractionWorkflow,
)

APPLICATION_NAME = "postgres-app-example"
logger = get_logger(__name__)

# Define custom components
class SQLClient(BaseSQLClient):
    DB_CONFIG = DatabaseConfig(
        template="postgresql+psycopg://{username}:{password}@{host}:{port}/{database}",
        required=["username", "password", "host", "port", "database"],
    )

class SampleSQLActivities(BaseSQLMetadataExtractionActivities):
    fetch_database_sql = "SELECT datname as database_name FROM pg_database WHERE datname = current_database();"
    fetch_schema_sql = "SELECT s.* FROM information_schema.schemata s WHERE s.schema_name NOT LIKE 'pg_%' AND s.schema_name != 'information_schema' AND concat(s.CATALOG_NAME, concat('.', s.SCHEMA_NAME)) !~ '{normalized_exclude_regex}' AND concat(s.CATALOG_NAME, concat('.', s.SCHEMA_NAME)) ~ '{normalized_include_regex}';"
    fetch_table_sql = "SELECT t.* FROM information_schema.tables t WHERE concat(current_database(), concat('.', t.table_schema)) !~ '{normalized_exclude_regex}' AND concat(current_database(), concat('.', t.table_schema)) ~ '{normalized_include_regex}' {temp_table_regex_sql};"
    extract_temp_table_regex_table_sql = "AND t.table_name !~ '{exclude_table_regex}'"
    fetch_column_sql = "SELECT c.* FROM information_schema.columns c WHERE concat(current_database(), concat('.', c.table_schema)) !~ '{normalized_exclude_regex}' AND concat(current_database(), concat('.', c.table_schema)) ~ '{normalized_include_regex}' {temp_table_regex_sql};"
    extract_temp_table_regex_column_sql = "AND c.table_name !~ '{exclude_table_regex}'"

class SampleSQLWorkflowHandler(BaseSQLHandler):
    tables_check_sql = "SELECT count(*) FROM INFORMATION_SCHEMA.TABLES t WHERE concat(TABLE_CATALOG, concat('.', TABLE_SCHEMA)) !~ '{normalized_exclude_regex}' AND concat(TABLE_CATALOG, concat('.', TABLE_SCHEMA)) ~ '{normalized_include_regex}' AND TABLE_SCHEMA NOT IN ('performance_schema', 'information_schema', 'pg_catalog', 'pg_internal') {temp_table_regex_sql};"
    temp_table_regex_sql = "AND t.table_name !~ '{exclude_table_regex}'"
    metadata_sql = "SELECT schema_name, catalog_name FROM INFORMATION_SCHEMA.SCHEMATA WHERE schema_name NOT LIKE 'pg_%' AND schema_name != 'information_schema'"

async def run_sql_application(daemon: bool = True) -> Dict[str, Any]:
    """Sets up and runs the SQL metadata extraction workflow."""
    logger.info(f"Starting SQL application: {APPLICATION_NAME}")
    
    # Initialize application with custom components
    app = BaseSQLMetadataExtractionApplication(
        name=APPLICATION_NAME,
        client_class=SQLClient,
        handler_class=SampleSQLWorkflowHandler,
    )
    
    # Set up workflow and activities
    await app.setup_workflow(
        workflow_and_activities_classes=[
            (BaseSQLMetadataExtractionWorkflow, SampleSQLActivities)
        ]
    )
    
    time.sleep(3)  # Wait for worker to initialize
    
    # Configure workflow arguments
    workflow_args = {
        "credentials": {
            "authType": "basic",
            "host": os.getenv("POSTGRES_HOST", "localhost"),
            "port": os.getenv("POSTGRES_PORT", "5432"),
            "username": os.getenv("POSTGRES_USER", "postgres"),
            "password": os.getenv("POSTGRES_PASSWORD", "password"),
            "database": os.getenv("POSTGRES_DATABASE", "postgres"),
        },
        "connection": {
            "connection_name": "test-postgres-connection",
            "connection_qualified_name": f"default/postgres/{int(time.time())}",
        },
        "metadata": {
            "exclude-filter": "{}",
            "include-filter": "{}",
            "temp-table-regex": "^temp_",
            "extraction-method": "direct",
            "exclude_views": "false",
            "exclude_empty_tables": "false",
        },
        "tenant_id": os.getenv("ATLAN_TENANT_ID", "default"),
    }
    
    # Start workflow execution
    logger.info(f"Starting workflow with args: {workflow_args}")
    workflow_response = await app.start_workflow(workflow_args=workflow_args)
    logger.info(f"Workflow started: {workflow_response}")
    
    # Start the worker
    logger.info(f"Starting worker (daemon={daemon})...")
    await app.start_worker(daemon=daemon)
    
    return workflow_response

if __name__ == "__main__":
    asyncio.run(run_sql_application(daemon=False))
Never commit credentials directly in code. Always load sensitive values from environment variables or secret stores.

Configuration Details

The workflow_args dictionary controls the workflow’s behavior:

Credentials Section

credentials.authType
string
required
Authentication type: basic, iam_user, or iam_role
credentials.host
string
required
Database host address
credentials.port
string
required
Database port number
credentials.username
string
required
Database username
credentials.password
string
required
Database password (for basic auth)
credentials.database
string
required
Database name to connect to

Connection Section

connection.connection_name
string
required
Human-readable connection name in Atlan
connection.connection_qualified_name
string
required
Unique qualified name for the connection (format: tenant/connector/timestamp)

Metadata Section

metadata.include-filter
string
default:"{}"
JSON string containing regex patterns to include schemas/tables
metadata.exclude-filter
string
default:"{}"
JSON string containing regex patterns to exclude schemas/tables
metadata.temp-table-regex
string
Regex pattern to identify temporary tables for exclusion
metadata.extraction-method
string
default:"direct"
Extraction method (typically “direct”)
metadata.exclude_views
string
default:"false"
Boolean string (“true”/“false”) to skip views
metadata.exclude_empty_tables
string
default:"false"
Boolean string to skip tables with zero rows

Optional Parameters

workflow_id
string
Existing workflow ID for reruns
cron_schedule
string
Cron expression for scheduled execution (e.g., “0 1 * * *” for daily at 1 AM)
tenant_id
string
required
Atlan tenant ID

Advanced Features

Scheduled Execution

Run workflows automatically by adding a cron schedule:
workflow_args["cron_schedule"] = "0 1 * * *"  # Run daily at 1 AM
Use crontab.guru to build and validate cron expressions.

Workflow Rerun

Resume or rerun a specific workflow execution:
workflow_args["workflow_id"] = "your-previous-workflow_id"

Custom Authentication

The BaseSQLClient supports different authentication types:
"credentials": {
    "authType": "basic",
    "username": "db_user",
    "password": "db_pass",
    # ... other connection details
}
You can extend BaseSQLClient to add support for other authentication mechanisms.

Best Practices

Load sensitive credentials from environment variables or secure stores, not directly in code. Use the constants defined in application_sdk.constants.
Implement try...except blocks in custom code (especially within Activities or Handlers) to handle potential database errors or unexpected data.
Use the SDK’s logger (application_sdk.observability.logger_adaptor.get_logger) for consistent and structured logging integrated with Temporal.
Design activities to be idempotent where possible, meaning they can be run multiple times with the same result. Temporal handles retries, but idempotent activities simplify recovery.
Write unit tests for your custom Client, Activities, and Handler classes. The SDK provides testing utilities in application_sdk.test_utils.
Ensure database connections are properly closed. The SDK’s client and workflow management generally handle this, but be mindful in custom extensions.

Next Steps

PostgreSQL Example

Explore a complete, production-ready PostgreSQL application.

Testing Guide

Learn how to test your SQL applications comprehensively.

Best Practices

Follow best practices for scalability and reliability.

Secret Stores

Secure credential management with secret stores.

Build docs developers (and LLMs) love