Skip to main content
Handlers provide the business logic layer between workflows and external systems, coordinating client operations and data processing.

HandlerInterface

application_sdk.handlers.HandlerInterface Abstract base class for workflow handlers.

Methods

load

Initialize and load the handler.
async def load(*args: Any, **kwargs: Any) -> None
*args
Any
Variable positional arguments
**kwargs
Any
Variable keyword arguments, typically including credentials
Must be implemented by subclasses

test_auth

Test the authentication credentials.
async def test_auth(*args: Any, **kwargs: Any) -> bool
*args
Any
Variable positional arguments
**kwargs
Any
Variable keyword arguments
return
bool
True if authentication is successful, False otherwise
raises
NotImplementedError
If not implemented by subclass

preflight_check

Perform preflight checks before workflow execution.
async def preflight_check(*args: Any, **kwargs: Any) -> Any
*args
Any
Variable positional arguments
**kwargs
Any
Variable keyword arguments, typically including metadata
return
Any
Dictionary with check results, typically containing success/failure information
raises
NotImplementedError
If not implemented by subclass

fetch_metadata

Fetch metadata from the data source.
async def fetch_metadata(*args: Any, **kwargs: Any) -> Any
*args
Any
Variable positional arguments
**kwargs
Any
Variable keyword arguments
return
Any
Fetched metadata
raises
NotImplementedError
If not implemented by subclass

get_configmap

Static method to get configuration map.
@staticmethod
async def get_configmap(config_map_id: str) -> Dict[str, Any]
config_map_id
str
required
The configuration map identifier
return
Dict[str, Any]
Configuration map data

BaseHandler

application_sdk.handlers.base.BaseHandler Base handler implementation for non-SQL based applications.

Constructor

BaseHandler(client: Optional[BaseClient] = None)
client
BaseClient
default:"BaseClient()"
The client instance to use for connections

Attributes

client
BaseClient
The client instance for connecting to the target system

Methods

load

Load and initialize the handler.
async def load(credentials: Dict[str, Any]) -> None
credentials
Dict[str, Any]
required
Credentials for the client

Example Usage

Basic Handler

from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.base import BaseClient
from typing import Dict, Any

class MyClient(BaseClient):
    async def load(self, **kwargs):
        credentials = kwargs.get("credentials", {})
        self.api_key = credentials.get("api_key")
        self.http_headers = {
            "Authorization": f"Bearer {self.api_key}"
        }
    
    async def fetch_data(self, endpoint: str):
        response = await self.execute_http_get_request(
            url=f"https://api.example.com/{endpoint}",
        )
        return response.json() if response else None

class MyHandler(BaseHandler):
    def __init__(self):
        super().__init__(client=MyClient())
    
    async def test_auth(self, **kwargs) -> bool:
        """Test API authentication."""
        try:
            result = await self.client.fetch_data("auth/test")
            return result is not None
        except Exception as e:
            logger.error(f"Authentication failed: {e}")
            return False
    
    async def preflight_check(self, **kwargs) -> Dict[str, Any]:
        """Perform preflight checks."""
        metadata = kwargs.get("metadata", {})
        
        checks = {
            "authentication": {
                "success": False,
                "failureMessage": ""
            },
            "connectivity": {
                "success": False,
                "failureMessage": ""
            }
        }
        
        # Test authentication
        try:
            checks["authentication"]["success"] = await self.test_auth()
            if not checks["authentication"]["success"]:
                checks["authentication"]["failureMessage"] = "Invalid credentials"
        except Exception as e:
            checks["authentication"]["failureMessage"] = str(e)
        
        # Test connectivity
        try:
            result = await self.client.fetch_data("health")
            checks["connectivity"]["success"] = result is not None
            if not checks["connectivity"]["success"]:
                checks["connectivity"]["failureMessage"] = "Service unavailable"
        except Exception as e:
            checks["connectivity"]["failureMessage"] = str(e)
        
        return checks
    
    async def fetch_metadata(self, **kwargs) -> Dict[str, Any]:
        """Fetch metadata from the API."""
        metadata = kwargs.get("metadata", {})
        dataset = metadata.get("dataset")
        
        data = await self.client.fetch_data(f"datasets/{dataset}/metadata")
        
        return {
            "dataset": dataset,
            "metadata": data,
            "count": len(data) if data else 0
        }

Database Handler

from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.sql import SQLClient
from typing import Dict, Any, List

class DatabaseHandler(BaseHandler):
    def __init__(self):
        self.sql_client = SQLClient()
        super().__init__(client=self.sql_client)
    
    async def load(self, credentials: Dict[str, Any]) -> None:
        """Load handler with database credentials."""
        await self.sql_client.load(credentials=credentials)
    
    async def test_auth(self, **kwargs) -> bool:
        """Test database connection."""
        try:
            query = "SELECT 1"
            result = await self.sql_client.execute_query(query)
            return result is not None
        except Exception as e:
            logger.error(f"Database connection test failed: {e}")
            return False
    
    async def preflight_check(self, **kwargs) -> Dict[str, Any]:
        """Check database connection and permissions."""
        checks = {
            "connection": {"success": False, "failureMessage": ""},
            "permissions": {"success": False, "failureMessage": ""}
        }
        
        # Test connection
        try:
            checks["connection"]["success"] = await self.test_auth()
            if not checks["connection"]["success"]:
                checks["connection"]["failureMessage"] = "Cannot connect to database"
        except Exception as e:
            checks["connection"]["failureMessage"] = str(e)
        
        # Test permissions
        if checks["connection"]["success"]:
            try:
                # Check if we can list tables
                query = "SHOW TABLES"
                result = await self.sql_client.execute_query(query)
                checks["permissions"]["success"] = result is not None
                if not checks["permissions"]["success"]:
                    checks["permissions"]["failureMessage"] = "Insufficient permissions"
            except Exception as e:
                checks["permissions"]["failureMessage"] = str(e)
        
        return checks
    
    async def fetch_metadata(self, **kwargs) -> Dict[str, Any]:
        """Fetch database schema metadata."""
        metadata = kwargs.get("metadata", {})
        schema = metadata.get("schema")
        
        # Get tables
        tables_query = f"SHOW TABLES FROM {schema}"
        tables = await self.sql_client.execute_query(tables_query)
        
        result = {
            "schema": schema,
            "tables": [],
            "total_tables": 0
        }
        
        if tables:
            for table in tables:
                table_name = table[0]
                
                # Get columns for each table
                columns_query = f"DESCRIBE {schema}.{table_name}"
                columns = await self.sql_client.execute_query(columns_query)
                
                result["tables"].append({
                    "name": table_name,
                    "columns": columns
                })
            
            result["total_tables"] = len(result["tables"])
        
        return result

Handler with Configmap

from application_sdk.handlers.base import BaseHandler
from application_sdk.services.statestore import StateStore, StateType
from typing import Dict, Any

class ConfigurableHandler(BaseHandler):
    
    async def load(self, credentials: Dict[str, Any]) -> None:
        """Load handler with credentials and config."""
        await super().load(credentials)
        
        # Load configmap if available
        config_map_id = credentials.get("config_map_id")
        if config_map_id:
            self.config = await self.get_configmap(config_map_id)
        else:
            self.config = {}
    
    @staticmethod
    async def get_configmap(config_map_id: str) -> Dict[str, Any]:
        """Get configuration map from state store."""
        try:
            config = await StateStore.get_state(
                config_map_id,
                StateType.WORKFLOWS
            )
            return config
        except Exception as e:
            logger.warning(f"Failed to load configmap: {e}")
            return {}
    
    async def preflight_check(self, **kwargs) -> Dict[str, Any]:
        """Perform checks using config."""
        checks = {
            "config": {"success": False, "failureMessage": ""}
        }
        
        # Validate config
        required_keys = ["api_endpoint", "timeout"]
        missing = [k for k in required_keys if k not in self.config]
        
        if missing:
            checks["config"]["failureMessage"] = f"Missing config keys: {missing}"
        else:
            checks["config"]["success"] = True
        
        return checks
    
    async def fetch_metadata(self, **kwargs) -> Dict[str, Any]:
        """Fetch metadata using configured endpoint."""
        endpoint = self.config.get("api_endpoint")
        timeout = self.config.get("timeout", 30)
        
        response = await self.client.execute_http_get_request(
            url=endpoint,
            timeout=timeout
        )
        
        return response.json() if response else {}

Multi-Source Handler

from application_sdk.handlers.base import BaseHandler
from typing import Dict, Any, List
import asyncio

class MultiSourceHandler(BaseHandler):
    
    async def fetch_metadata(self, **kwargs) -> Dict[str, Any]:
        """Fetch metadata from multiple sources in parallel."""
        metadata = kwargs.get("metadata", {})
        sources = metadata.get("sources", [])
        
        # Fetch from all sources in parallel
        tasks = [
            self._fetch_from_source(source)
            for source in sources
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Aggregate results
        aggregated = {
            "sources": [],
            "total_records": 0,
            "errors": []
        }
        
        for source, result in zip(sources, results):
            if isinstance(result, Exception):
                aggregated["errors"].append({
                    "source": source,
                    "error": str(result)
                })
            else:
                aggregated["sources"].append(result)
                aggregated["total_records"] += result.get("count", 0)
        
        return aggregated
    
    async def _fetch_from_source(self, source: str) -> Dict[str, Any]:
        """Fetch data from a single source."""
        response = await self.client.execute_http_get_request(
            url=f"https://api.example.com/{source}"
        )
        
        data = response.json() if response else []
        
        return {
            "source": source,
            "data": data,
            "count": len(data)
        }

Best Practices

Handler Design

  • Keep handlers focused on coordination, not implementation details
  • Use clients for actual API/database operations
  • Implement all HandlerInterface methods
  • Return structured results from preflight checks

Error Handling

  • Always implement try/except blocks in handler methods
  • Return structured error information in preflight checks
  • Log errors with appropriate context
  • Don’t let exceptions propagate unhandled

Client Management

  • Initialize clients in the constructor
  • Load clients with credentials in the load() method
  • Close client connections when done
  • Reuse clients across handler methods

Configuration

  • Use configmaps for environment-specific settings
  • Validate configuration in preflight_check
  • Provide sensible defaults
  • Document required configuration keys

Build docs developers (and LLMs) love