Skip to main content
Follow these best practices to build production-ready applications that are scalable, reliable, and maintainable.

Scaling in Python

Python’s Global Interpreter Lock (GIL) limits native multi-threading, but we can still leverage multi-threading effectively for I/O-bound tasks since the GIL is released during I/O operations.
The SDK combines Python’s I/O capabilities with Temporal’s workflow orchestration to enable scalable execution across multiple workers.

Async Functions

Python 3.5+ supports asynchronous programming through async and await keywords, enabling non-blocking, concurrent code execution:
import asyncio
from application_sdk.clients.sql import BaseSQLClient

class SQLClient(BaseSQLClient):
    async def fetch_metadata(self, query: str):
        # Non-blocking database query
        result = await self.execute_query(query)
        return result

async def extract_multiple_schemas(schemas: list):
    # Execute multiple queries concurrently
    tasks = [fetch_schema_metadata(schema) for schema in schemas]
    results = await asyncio.gather(*tasks)
    return results
Use asyncio.gather() to execute multiple async operations concurrently and improve performance.

Temporal Parallelism

Temporal operates on a worker-pool model that enables true parallelism:
1

Worker Registration

Workers register to handle specific workflows and activities based on task queues.
2

Process Isolation

Each worker runs in a separate process, bypassing Python’s GIL limitations.
3

Concurrent Execution

Workers can run multiple workflows and activities concurrently while listening on activity queues.
4

High Availability

Production deployments require at least 3 workers for high availability and fault tolerance.
from application_sdk.worker import Worker
from application_sdk.workflows.metadata_extraction.sql import (
    BaseSQLMetadataExtractionWorkflow,
)

# Create worker with multiple workflow classes
worker = Worker(
    workflow_client=workflow_client,
    workflow_classes=[
        BaseSQLMetadataExtractionWorkflow,
        CustomWorkflow,
    ],
    workflow_activities=activities,
    max_concurrent_activities=10,  # Control concurrency
)

Temporal Architecture

This architecture enables concurrency at both workflow and activity levels based on worker availability.

Python Multiprocessing

For additional scaling beyond Temporal activity parallelism, use Python’s multiprocessing library:
import multiprocessing as mp
from typing import List, Dict, Any

def process_table_chunk(tables: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Process a chunk of tables."""
    results = []
    for table in tables:
        # Perform CPU-intensive transformation
        transformed = transform_table(table)
        results.append(transformed)
    return results

def parallel_table_processing(all_tables: List[Dict[str, Any]], num_processes: int = 4):
    """Process tables in parallel across multiple CPU cores."""
    # Split tables into chunks
    chunk_size = len(all_tables) // num_processes
    chunks = [all_tables[i:i + chunk_size] for i in range(0, len(all_tables), chunk_size)]
    
    # Process chunks in parallel
    with mp.Pool(processes=num_processes) as pool:
        results = pool.map(process_table_chunk, chunks)
    
    # Flatten results
    return [item for sublist in results for item in sublist]
Be mindful of memory usage when using multiprocessing with large datasets. Consider processing data in batches.

Memoization

Implement memoization to cache expensive function results and improve performance:

Function-Level Memoization

from functools import lru_cache
from typing import Dict, Any

class MetadataExtractor:
    @lru_cache(maxsize=128)
    def get_schema_info(self, schema_name: str) -> Dict[str, Any]:
        """Cache schema information to avoid repeated queries."""
        # Expensive database query
        return self.fetch_schema_from_db(schema_name)
    
    @lru_cache(maxsize=512)
    def transform_data_type(self, db_type: str, dialect: str) -> str:
        """Cache data type transformations."""
        # Complex transformation logic
        return self._perform_type_mapping(db_type, dialect)

State Store for Intermediate Results

Use the state store system to save intermediate results at regular intervals:
from application_sdk.clients.state_store import StateStoreClient

class WorkflowActivity:
    def __init__(self):
        self.state_store = StateStoreClient()
    
    async def extract_large_dataset(self, checkpoint_interval: int = 1000):
        """Extract data with checkpointing."""
        processed_count = 0
        checkpoint_key = "extraction_checkpoint"
        
        # Try to resume from checkpoint
        checkpoint = await self.state_store.get(checkpoint_key)
        start_index = checkpoint.get("last_processed", 0) if checkpoint else 0
        
        for i in range(start_index, total_records):
            # Process record
            process_record(i)
            processed_count += 1
            
            # Save checkpoint periodically
            if processed_count % checkpoint_interval == 0:
                await self.state_store.set(checkpoint_key, {
                    "last_processed": i,
                    "timestamp": time.time()
                })

Recovery Benefits

Checkpointing enables activity recovery from the state store if pods are deleted and activities are re-run.

Reliability

Activity Heartbeats

Temporal monitors activity health through heartbeats. Activities that fail to heartbeat are automatically retried on different workers.
1

Configure Heartbeat Timeout

Set heartbeat_timeout parameter when executing activities to define the maximum time between heartbeats.
2

Send Heartbeats

Use the auto_heartbeater decorator to automatically send heartbeats, or manually call heartbeat functions.
3

Report Progress

Include progress information in heartbeats to aid debugging and monitoring.

Automatic Heartbeating

from application_sdk.activities.decorators import auto_heartbeater
from temporalio import activity

class MetadataActivities:
    @activity.defn
    @auto_heartbeater(heartbeat_interval=30)  # Heartbeat every 30 seconds
    async def extract_tables(self, schema_name: str) -> List[Dict]:
        """Extract tables with automatic heartbeating."""
        tables = []
        for i in range(1000):  # Long-running operation
            table = await self.fetch_table(schema_name, i)
            tables.append(table)
        return tables

Manual Heartbeating with Progress

from temporalio import activity

class MetadataActivities:
    @activity.defn
    async def extract_tables_with_progress(self, schema_name: str) -> List[Dict]:
        """Extract tables with manual heartbeating and progress reporting."""
        tables = []
        total = 1000
        
        for i in range(total):
            # Send heartbeat with progress information
            activity.heartbeat({"processed": i, "total": total})
            
            table = await self.fetch_table(schema_name, i)
            tables.append(table)
        
        return tables
The auto_heartbeater decorator sends heartbeats 3x per timeout interval by default.

Activity Timeouts

Properly configure activity timeouts to ensure reliable execution:
start_to_close_timeout
duration
required
Maximum time an activity can run from start to completion. Always set this value.
heartbeat_timeout
duration
Maximum time between heartbeats. Essential for long-running activities.
schedule_to_start_timeout
duration
Maximum time an activity can wait in the queue before starting.
schedule_to_close_timeout
duration
Maximum total time including queuing and execution.
from datetime import timedelta
from temporalio.workflow import execute_activity

class MetadataWorkflow:
    async def run(self):
        result = await execute_activity(
            MetadataActivities.extract_tables,
            args=["public"],
            start_to_close_timeout=timedelta(minutes=30),
            heartbeat_timeout=timedelta(minutes=2),
            retry_policy={
                "maximum_attempts": 3,
                "initial_interval": timedelta(seconds=1),
                "maximum_interval": timedelta(minutes=1),
                "backoff_coefficient": 2.0,
            },
        )
        return result

Learn More

Read the comprehensive guide on Temporal activity timeouts and best practices.

Configuration Management

Load configuration from environment variables using os.getenv() with sensible defaults:
import os

config = {
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
    "max_connections": int(os.getenv("MAX_CONNECTIONS", "10")),
}
Use secret stores for sensitive credentials instead of environment variables:
from application_sdk.clients.secrets import get_secret

credentials = {
    "username": await get_secret("db_username"),
    "password": await get_secret("db_password"),
}
See the Secret Stores guide for more information.
Define application constants in a dedicated module:
# constants.py
from application_sdk.constants import (
    TEMPORAL_NAMESPACE,
    TEMPORAL_HOST,
    OBJECT_STORE_BUCKET,
)

APPLICATION_NAME = "my-app"
DEFAULT_BATCH_SIZE = 1000
MAX_RETRIES = 3

Logging

Use the SDK’s integrated logger for consistent, structured logging:
from application_sdk.observability.logger_adaptor import get_logger

logger = get_logger(__name__)

class MetadataExtractor:
    def extract_tables(self, schema: str):
        logger.info(f"Extracting tables from schema: {schema}")
        
        try:
            tables = self.fetch_tables(schema)
            logger.info(f"Successfully extracted {len(tables)} tables", extra={
                "schema": schema,
                "table_count": len(tables)
            })
            return tables
        except Exception as e:
            logger.error(f"Failed to extract tables from {schema}", extra={
                "schema": schema,
                "error": str(e)
            }, exc_info=True)
            raise
Include contextual information in the extra parameter for better debugging and monitoring.

Error Handling

Catch specific exceptions and handle them appropriately:
from sqlalchemy.exc import DatabaseError, OperationalError

try:
    result = await client.execute_query(query)
except OperationalError as e:
    logger.warning(f"Database connection error: {e}")
    # Retry logic
except DatabaseError as e:
    logger.error(f"Database error: {e}")
    # Handle or re-raise
Let Temporal handle retries for transient errors:
from temporalio import activity
from temporalio.exceptions import ApplicationError

@activity.defn
async def extract_metadata(schema: str) -> dict:
    try:
        return await fetch_metadata(schema)
    except TemporaryError as e:
        # Let Temporal retry
        raise
    except PermanentError as e:
        # Don't retry - raise non-retryable error
        raise ApplicationError(
            f"Permanent error: {e}",
            non_retryable=True
        )
Handle workflow-level errors gracefully:
from temporalio import workflow

@workflow.defn
class MetadataWorkflow:
    @workflow.run
    async def run(self, config: dict) -> dict:
        try:
            # Execute activities
            result = await workflow.execute_activity(...)
            return result
        except Exception as e:
            # Log and handle workflow failure
            workflow.logger.error(f"Workflow failed: {e}")
            # Potentially execute compensation logic
            await self.cleanup()
            raise

Testing

Unit Tests

Write focused unit tests for individual components:
def test_sql_client_connection():
    client = SQLClient(config)
    assert client.test_connection() is True

Integration Tests

Test component interactions:
async def test_metadata_extraction():
    activities = MetadataActivities()
    result = await activities.extract_tables("public")
    assert len(result) > 0

End-to-End Tests

Use the SDK’s test framework:
from application_sdk.test_utils import BaseTest

class TestWorkflow(BaseTest):
    # Inherits all test methods
    pass

Testing Guide

See the complete testing guide for comprehensive testing strategies.

Performance Optimization

1

Batch Processing

Process data in batches to optimize memory usage:
BATCH_SIZE = 1000

for i in range(0, len(tables), BATCH_SIZE):
    batch = tables[i:i + BATCH_SIZE]
    await process_batch(batch)
2

Connection Pooling

Use connection pooling for database clients:
from sqlalchemy.pool import QueuePool

engine = create_engine(
    connection_string,
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20
)
3

Lazy Loading

Load data only when needed:
def get_table_details(table_name: str):
    # Fetch basic info first
    table = get_table_info(table_name)
    
    # Load detailed info only if needed
    if requires_details:
        table.columns = get_column_details(table_name)
    
    return table
4

Parallel Execution

Use asyncio.gather() for concurrent operations:
schemas = ["public", "sales", "analytics"]
results = await asyncio.gather(*[
    extract_schema(schema) for schema in schemas
])

Next Steps

SQL Applications

Build comprehensive SQL metadata extraction applications.

Testing Guide

Write comprehensive tests for your applications.

Secret Stores

Secure credential management with secret stores.

Architecture

Understand the SDK’s architecture and design principles.

Build docs developers (and LLMs) love