Skip to main content

Overview

Handlers are responsible for implementing the core business logic of your application. They provide the interface between workflows/activities and external systems, handling operations like authentication, data fetching, validation, and metadata extraction.
Handlers work in conjunction with Clients to interact with external systems. While clients manage connections, handlers implement the business logic.

HandlerInterface

The HandlerInterface is an abstract base class that defines the required methods all handlers must implement.
from abc import ABC, abstractmethod
from typing import Any, Dict

class HandlerInterface(ABC):
    """Abstract base class for workflow handlers"""
    
    @abstractmethod
    async def load(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the handler"""
        pass
    
    @abstractmethod
    async def test_auth(self, *args: Any, **kwargs: Any) -> bool:
        """Test authentication credentials"""
        pass
    
    @abstractmethod
    async def preflight_check(self, *args: Any, **kwargs: Any) -> Any:
        """Perform preflight checks"""
        pass
    
    @abstractmethod
    async def fetch_metadata(self, *args: Any, **kwargs: Any) -> Any:
        """Fetch metadata from source system"""
        pass
    
    @staticmethod
    async def get_configmap(config_map_id: str) -> Dict[str, Any]:
        """Get configuration map"""
        return {}

BaseHandler

The SDK provides a BaseHandler class that implements HandlerInterface for non-SQL based applications.
from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.base import BaseClient
from typing import Dict, Any

class MyHandler(BaseHandler):
    def __init__(self, client: BaseClient = None):
        super().__init__(client)
    
    async def load(self, credentials: Dict[str, Any]) -> None:
        """Load handler with credentials."""
        await super().load(credentials)
        # Custom initialization logic
    
    async def test_auth(self, **kwargs: Any) -> bool:
        """Test authentication."""
        # Implementation
        pass
    
    async def preflight_check(self, **kwargs: Any) -> Dict[str, Any]:
        """Perform preflight checks."""
        # Implementation
        pass
    
    async def fetch_metadata(self, **kwargs: Any) -> Dict[str, Any]:
        """Fetch metadata."""
        # Implementation
        pass

Constructor

client
BaseClient
default:"BaseClient()"
The client instance to use for connections to external systems.

Required Methods

load()

Initialize the handler with credentials and load the underlying client.
async def load(self, credentials: Dict[str, Any]) -> None:
    """Load and initialize the handler."""
    logger.info("Loading handler")
    
    # Load the client with credentials
    await self.client.load(credentials=credentials)
    
    # Additional handler-specific initialization
    self.api_key = credentials.get("api_key")
    self.base_url = credentials.get("base_url", "https://api.example.com")
    
    logger.info("Handler loaded successfully")
credentials
Dict[str, Any]
required
Dictionary containing authentication credentials and configuration parameters.Common fields:
  • api_key: API authentication key
  • username: Username for authentication
  • password: Password for authentication
  • base_url: Base URL for API endpoints
  • tenant_id: Tenant identifier
The base BaseHandler.load() method automatically calls client.load() with the provided credentials. Always call await super().load(credentials) when overriding.

test_auth()

Test authentication credentials by making a simple request to verify connectivity and authorization.
async def test_auth(self, **kwargs: Any) -> bool:
    """Test authentication credentials."""
    try:
        # Make a lightweight API call to test auth
        response = await self.client.execute_http_get_request(
            url=f"{self.base_url}/api/auth/verify",
            timeout=10
        )
        
        if response and response.status_code == 200:
            logger.info("Authentication successful")
            return True
        else:
            logger.error(f"Authentication failed: {response.status_code if response else 'No response'}")
            return False
            
    except Exception as e:
        logger.error(f"Authentication test failed: {e}")
        return False
**kwargs
Any
Additional keyword arguments that may be passed from the API endpoint.
return
bool
Returns True if authentication is successful, False otherwise.
This method is called by the FastAPI /workflows/v1/auth endpoint during credential validation.

preflight_check()

Perform validation checks before workflow execution to ensure all requirements are met.
async def preflight_check(self, **kwargs: Any) -> Dict[str, Any]:
    """Perform preflight checks."""
    metadata = kwargs.get("metadata", {})
    
    results = {}
    
    # Check 1: Connection test
    try:
        response = await self.client.execute_http_get_request(
            url=f"{self.base_url}/api/health"
        )
        results["connection_check"] = {
            "success": response is not None and response.status_code == 200,
            "message": "Connection successful" if response and response.status_code == 200 else "Connection failed"
        }
    except Exception as e:
        results["connection_check"] = {
            "success": False,
            "message": f"Connection failed: {str(e)}",
            "failureMessage": str(e)
        }
    
    # Check 2: Validate required parameters
    required_params = ["source", "destination"]
    missing_params = [p for p in required_params if p not in metadata]
    
    results["parameters_check"] = {
        "success": len(missing_params) == 0,
        "message": "All parameters present" if not missing_params else f"Missing: {', '.join(missing_params)}",
        "failureMessage": f"Missing required parameters: {', '.join(missing_params)}" if missing_params else None
    }
    
    # Check 3: Validate permissions
    try:
        has_permission = await self._check_permissions(metadata)
        results["permissions_check"] = {
            "success": has_permission,
            "message": "Permissions validated" if has_permission else "Insufficient permissions",
            "failureMessage": "User does not have required permissions" if not has_permission else None
        }
    except Exception as e:
        results["permissions_check"] = {
            "success": False,
            "message": "Permission check failed",
            "failureMessage": str(e)
        }
    
    return results
**kwargs
Any
Keyword arguments containing:
  • metadata: Workflow metadata and configuration
  • credentials: Authentication credentials (optional)
return
Dict[str, Any]
Dictionary where each key is a check name and value contains:
  • success (bool): Whether the check passed
  • message (str): Status message
  • failureMessage (str, optional): Error message if check failed
Each check result must include a success field. The preflight activity will fail if any check returns success: False.

fetch_metadata()

Fetch metadata from the source system based on the request parameters.
async def fetch_metadata(self, **kwargs: Any) -> Dict[str, Any]:
    """Fetch metadata from the source system."""
    metadata_type = kwargs.get("metadata_type", "all")
    database = kwargs.get("database", "")
    
    logger.info(f"Fetching metadata: type={metadata_type}, database={database}")
    
    if metadata_type == "databases":
        return await self._fetch_databases()
    elif metadata_type == "tables":
        return await self._fetch_tables(database)
    elif metadata_type == "columns":
        table = kwargs.get("table", "")
        return await self._fetch_columns(database, table)
    else:
        # Fetch all metadata
        return {
            "databases": await self._fetch_databases(),
            "tables": await self._fetch_tables(database) if database else []
        }

async def _fetch_databases(self) -> List[Dict[str, Any]]:
    """Fetch list of databases."""
    response = await self.client.execute_http_get_request(
        url=f"{self.base_url}/api/metadata/databases"
    )
    
    if response and response.status_code == 200:
        return response.json().get("databases", [])
    return []

async def _fetch_tables(self, database: str) -> List[Dict[str, Any]]:
    """Fetch tables for a database."""
    response = await self.client.execute_http_get_request(
        url=f"{self.base_url}/api/metadata/tables",
        params={"database": database}
    )
    
    if response and response.status_code == 200:
        return response.json().get("tables", [])
    return []
**kwargs
Any
Keyword arguments containing:
  • metadata_type (str): Type of metadata to fetch (“all”, “databases”, “tables”, “columns”)
  • database (str): Database name for filtering
  • Additional type-specific parameters
return
Dict[str, Any]
Dictionary containing the requested metadata. Structure depends on metadata_type.
This method is called by the FastAPI /workflows/v1/metadata endpoint to populate UI dropdowns and forms.

get_configmap()

Retrieve configuration maps by ID. This is a static method that can be overridden to provide application-specific configuration.
@staticmethod
async def get_configmap(config_map_id: str) -> Dict[str, Any]:
    """Get configuration map by ID."""
    # Example: Load from file or API
    config_maps = {
        "default": {
            "max_retries": 3,
            "timeout": 30,
            "batch_size": 1000
        },
        "production": {
            "max_retries": 5,
            "timeout": 60,
            "batch_size": 5000
        }
    }
    
    return config_maps.get(config_map_id, {})
config_map_id
str
required
Identifier for the configuration map to retrieve.
return
Dict[str, Any]
Configuration map dictionary. Returns empty dict if not found.

Complete Handler Example

from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.base import BaseClient
from application_sdk.observability.logger_adaptor import get_logger
from typing import Dict, Any, List
import httpx

logger = get_logger(__name__)

class DataSourceHandler(BaseHandler):
    """Handler for interacting with a data source API."""
    
    def __init__(self, client: BaseClient = None):
        super().__init__(client or BaseClient())
        self.api_key = None
        self.base_url = None
    
    async def load(self, credentials: Dict[str, Any]) -> None:
        """Initialize handler with credentials."""
        logger.info("Loading DataSourceHandler")
        
        # Load base client
        await super().load(credentials)
        
        # Extract handler-specific configuration
        self.api_key = credentials.get("api_key")
        self.base_url = credentials.get("base_url", "https://api.datasource.com")
        
        if not self.api_key:
            raise ValueError("API key is required")
        
        logger.info("DataSourceHandler loaded successfully")
    
    async def test_auth(self, **kwargs: Any) -> bool:
        """Test API authentication."""
        try:
            response = await self.client.execute_http_get_request(
                url=f"{self.base_url}/v1/auth/verify",
                headers={"Authorization": f"Bearer {self.api_key}"},
                timeout=10
            )
            
            if response and response.status_code == 200:
                logger.info("Authentication test passed")
                return True
            
            logger.error(f"Authentication failed: {response.status_code if response else 'No response'}")
            return False
            
        except Exception as e:
            logger.error(f"Authentication error: {e}", exc_info=True)
            return False
    
    async def preflight_check(self, **kwargs: Any) -> Dict[str, Any]:
        """Validate configuration before workflow execution."""
        metadata = kwargs.get("metadata", {})
        results = {}
        
        # Connection check
        try:
            response = await self.client.execute_http_get_request(
                url=f"{self.base_url}/v1/health",
                headers={"Authorization": f"Bearer {self.api_key}"}
            )
            
            results["connection"] = {
                "success": response is not None and response.status_code == 200,
                "message": "API connection successful" if response and response.status_code == 200 else "Connection failed",
                "failureMessage": "Unable to connect to API" if not (response and response.status_code == 200) else None
            }
        except Exception as e:
            results["connection"] = {
                "success": False,
                "message": "Connection error",
                "failureMessage": str(e)
            }
        
        # Validate source exists
        source_id = metadata.get("source_id")
        if source_id:
            try:
                source_exists = await self._verify_source(source_id)
                results["source_verification"] = {
                    "success": source_exists,
                    "message": f"Source {source_id} verified" if source_exists else "Source not found",
                    "failureMessage": f"Source {source_id} does not exist" if not source_exists else None
                }
            except Exception as e:
                results["source_verification"] = {
                    "success": False,
                    "message": "Source verification failed",
                    "failureMessage": str(e)
                }
        
        return results
    
    async def fetch_metadata(self, **kwargs: Any) -> Dict[str, Any]:
        """Fetch metadata from the data source."""
        metadata_type = kwargs.get("metadata_type", "all")
        database = kwargs.get("database", "")
        
        logger.info(f"Fetching metadata: type={metadata_type}")
        
        if metadata_type == "sources":
            return {"sources": await self._fetch_sources()}
        elif metadata_type == "schemas":
            return {"schemas": await self._fetch_schemas(database)}
        else:
            return {
                "sources": await self._fetch_sources(),
                "schemas": await self._fetch_schemas(database) if database else []
            }
    
    async def _verify_source(self, source_id: str) -> bool:
        """Verify that a source exists."""
        response = await self.client.execute_http_get_request(
            url=f"{self.base_url}/v1/sources/{source_id}",
            headers={"Authorization": f"Bearer {self.api_key}"}
        )
        return response is not None and response.status_code == 200
    
    async def _fetch_sources(self) -> List[Dict[str, Any]]:
        """Fetch available data sources."""
        response = await self.client.execute_http_get_request(
            url=f"{self.base_url}/v1/sources",
            headers={"Authorization": f"Bearer {self.api_key}"}
        )
        
        if response and response.status_code == 200:
            return response.json().get("sources", [])
        return []
    
    async def _fetch_schemas(self, database: str) -> List[Dict[str, Any]]:
        """Fetch schemas for a database."""
        response = await self.client.execute_http_get_request(
            url=f"{self.base_url}/v1/schemas",
            params={"database": database},
            headers={"Authorization": f"Bearer {self.api_key}"}
        )
        
        if response and response.status_code == 200:
            return response.json().get("schemas", [])
        return []
    
    # Custom business logic methods
    async def extract_data(self, source_id: str, query: str) -> List[Dict[str, Any]]:
        """Extract data from the source."""
        response = await self.client.execute_http_post_request(
            url=f"{self.base_url}/v1/query",
            json_data={
                "source_id": source_id,
                "query": query
            },
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=300
        )
        
        if response and response.status_code == 200:
            return response.json().get("data", [])
        
        raise Exception(f"Data extraction failed: {response.status_code if response else 'No response'}")

FastAPI Endpoint Integration

Handlers are automatically integrated with FastAPI endpoints:
EndpointHandler MethodPurpose
POST /workflows/v1/authtest_auth()Validate credentials
POST /workflows/v1/metadatafetch_metadata()Get metadata for UI
POST /workflows/v1/checkpreflight_check()Pre-execution validation
GET /workflows/v1/configmap/{id}get_configmap()Retrieve configuration

Best Practices

Keep connection management in the client and business logic in the handler.
# ❌ Bad - Connection logic in handler
class BadHandler(BaseHandler):
    async def fetch_data(self):
        async with httpx.AsyncClient() as client:
            response = await client.get(url)

# ✅ Good - Use client for connections
class GoodHandler(BaseHandler):
    async def fetch_data(self):
        response = await self.client.execute_http_get_request(url)
Implement comprehensive preflight checks to catch issues early.
async def preflight_check(self, **kwargs):
    return {
        "connection": {...},
        "permissions": {...},
        "source_validation": {...},
        "destination_validation": {...}
    }
Always handle exceptions and return meaningful error messages.
try:
    result = await self._risky_operation()
except httpx.HTTPError as e:
    logger.error(f"HTTP error: {e}")
    return {"success": False, "failureMessage": f"API error: {str(e)}"}
except Exception as e:
    logger.error(f"Unexpected error: {e}", exc_info=True)
    return {"success": False, "failureMessage": "Internal error occurred"}
  • Clients - Manage connections to external systems
  • Activities - Call handler methods from workflows
  • Server - Exposes handler methods via API endpoints

Build docs developers (and LLMs) love