Overview
Handlers are responsible for implementing the core business logic of your application. They provide the interface between workflows/activities and external systems, handling operations like authentication, data fetching, validation, and metadata extraction.
Handlers work in conjunction with Clients to interact with external systems. While clients manage connections, handlers implement the business logic.
HandlerInterface
The HandlerInterface is an abstract base class that defines the required methods all handlers must implement.
from abc import ABC , abstractmethod
from typing import Any, Dict
class HandlerInterface ( ABC ):
"""Abstract base class for workflow handlers"""
@abstractmethod
async def load ( self , * args : Any, ** kwargs : Any) -> None :
"""Initialize the handler"""
pass
@abstractmethod
async def test_auth ( self , * args : Any, ** kwargs : Any) -> bool :
"""Test authentication credentials"""
pass
@abstractmethod
async def preflight_check ( self , * args : Any, ** kwargs : Any) -> Any:
"""Perform preflight checks"""
pass
@abstractmethod
async def fetch_metadata ( self , * args : Any, ** kwargs : Any) -> Any:
"""Fetch metadata from source system"""
pass
@ staticmethod
async def get_configmap ( config_map_id : str ) -> Dict[ str , Any]:
"""Get configuration map"""
return {}
BaseHandler
The SDK provides a BaseHandler class that implements HandlerInterface for non-SQL based applications.
from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.base import BaseClient
from typing import Dict, Any
class MyHandler ( BaseHandler ):
def __init__ ( self , client : BaseClient = None ):
super (). __init__ (client)
async def load ( self , credentials : Dict[ str , Any]) -> None :
"""Load handler with credentials."""
await super ().load(credentials)
# Custom initialization logic
async def test_auth ( self , ** kwargs : Any) -> bool :
"""Test authentication."""
# Implementation
pass
async def preflight_check ( self , ** kwargs : Any) -> Dict[ str , Any]:
"""Perform preflight checks."""
# Implementation
pass
async def fetch_metadata ( self , ** kwargs : Any) -> Dict[ str , Any]:
"""Fetch metadata."""
# Implementation
pass
Constructor
client
BaseClient
default: "BaseClient()"
The client instance to use for connections to external systems.
Required Methods
load()
Initialize the handler with credentials and load the underlying client.
async def load ( self , credentials : Dict[ str , Any]) -> None :
"""Load and initialize the handler."""
logger.info( "Loading handler" )
# Load the client with credentials
await self .client.load( credentials = credentials)
# Additional handler-specific initialization
self .api_key = credentials.get( "api_key" )
self .base_url = credentials.get( "base_url" , "https://api.example.com" )
logger.info( "Handler loaded successfully" )
Dictionary containing authentication credentials and configuration parameters. Common fields:
api_key: API authentication key
username: Username for authentication
password: Password for authentication
base_url: Base URL for API endpoints
tenant_id: Tenant identifier
The base BaseHandler.load() method automatically calls client.load() with the provided credentials. Always call await super().load(credentials) when overriding.
test_auth()
Test authentication credentials by making a simple request to verify connectivity and authorization.
async def test_auth ( self , ** kwargs : Any) -> bool :
"""Test authentication credentials."""
try :
# Make a lightweight API call to test auth
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /api/auth/verify" ,
timeout = 10
)
if response and response.status_code == 200 :
logger.info( "Authentication successful" )
return True
else :
logger.error( f "Authentication failed: { response.status_code if response else 'No response' } " )
return False
except Exception as e:
logger.error( f "Authentication test failed: { e } " )
return False
Additional keyword arguments that may be passed from the API endpoint.
Returns True if authentication is successful, False otherwise.
This method is called by the FastAPI /workflows/v1/auth endpoint during credential validation.
preflight_check()
Perform validation checks before workflow execution to ensure all requirements are met.
async def preflight_check ( self , ** kwargs : Any) -> Dict[ str , Any]:
"""Perform preflight checks."""
metadata = kwargs.get( "metadata" , {})
results = {}
# Check 1: Connection test
try :
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /api/health"
)
results[ "connection_check" ] = {
"success" : response is not None and response.status_code == 200 ,
"message" : "Connection successful" if response and response.status_code == 200 else "Connection failed"
}
except Exception as e:
results[ "connection_check" ] = {
"success" : False ,
"message" : f "Connection failed: { str (e) } " ,
"failureMessage" : str (e)
}
# Check 2: Validate required parameters
required_params = [ "source" , "destination" ]
missing_params = [p for p in required_params if p not in metadata]
results[ "parameters_check" ] = {
"success" : len (missing_params) == 0 ,
"message" : "All parameters present" if not missing_params else f "Missing: { ', ' .join(missing_params) } " ,
"failureMessage" : f "Missing required parameters: { ', ' .join(missing_params) } " if missing_params else None
}
# Check 3: Validate permissions
try :
has_permission = await self ._check_permissions(metadata)
results[ "permissions_check" ] = {
"success" : has_permission,
"message" : "Permissions validated" if has_permission else "Insufficient permissions" ,
"failureMessage" : "User does not have required permissions" if not has_permission else None
}
except Exception as e:
results[ "permissions_check" ] = {
"success" : False ,
"message" : "Permission check failed" ,
"failureMessage" : str (e)
}
return results
Keyword arguments containing:
metadata: Workflow metadata and configuration
credentials: Authentication credentials (optional)
Dictionary where each key is a check name and value contains:
success (bool): Whether the check passed
message (str): Status message
failureMessage (str, optional): Error message if check failed
Each check result must include a success field. The preflight activity will fail if any check returns success: False.
Fetch metadata from the source system based on the request parameters.
async def fetch_metadata ( self , ** kwargs : Any) -> Dict[ str , Any]:
"""Fetch metadata from the source system."""
metadata_type = kwargs.get( "metadata_type" , "all" )
database = kwargs.get( "database" , "" )
logger.info( f "Fetching metadata: type= { metadata_type } , database= { database } " )
if metadata_type == "databases" :
return await self ._fetch_databases()
elif metadata_type == "tables" :
return await self ._fetch_tables(database)
elif metadata_type == "columns" :
table = kwargs.get( "table" , "" )
return await self ._fetch_columns(database, table)
else :
# Fetch all metadata
return {
"databases" : await self ._fetch_databases(),
"tables" : await self ._fetch_tables(database) if database else []
}
async def _fetch_databases ( self ) -> List[Dict[ str , Any]]:
"""Fetch list of databases."""
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /api/metadata/databases"
)
if response and response.status_code == 200 :
return response.json().get( "databases" , [])
return []
async def _fetch_tables ( self , database : str ) -> List[Dict[ str , Any]]:
"""Fetch tables for a database."""
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /api/metadata/tables" ,
params = { "database" : database}
)
if response and response.status_code == 200 :
return response.json().get( "tables" , [])
return []
Keyword arguments containing:
metadata_type (str): Type of metadata to fetch (“all”, “databases”, “tables”, “columns”)
database (str): Database name for filtering
Additional type-specific parameters
Dictionary containing the requested metadata. Structure depends on metadata_type.
This method is called by the FastAPI /workflows/v1/metadata endpoint to populate UI dropdowns and forms.
get_configmap()
Retrieve configuration maps by ID. This is a static method that can be overridden to provide application-specific configuration.
@ staticmethod
async def get_configmap ( config_map_id : str ) -> Dict[ str , Any]:
"""Get configuration map by ID."""
# Example: Load from file or API
config_maps = {
"default" : {
"max_retries" : 3 ,
"timeout" : 30 ,
"batch_size" : 1000
},
"production" : {
"max_retries" : 5 ,
"timeout" : 60 ,
"batch_size" : 5000
}
}
return config_maps.get(config_map_id, {})
Identifier for the configuration map to retrieve.
Configuration map dictionary. Returns empty dict if not found.
Complete Handler Example
from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.base import BaseClient
from application_sdk.observability.logger_adaptor import get_logger
from typing import Dict, Any, List
import httpx
logger = get_logger( __name__ )
class DataSourceHandler ( BaseHandler ):
"""Handler for interacting with a data source API."""
def __init__ ( self , client : BaseClient = None ):
super (). __init__ (client or BaseClient())
self .api_key = None
self .base_url = None
async def load ( self , credentials : Dict[ str , Any]) -> None :
"""Initialize handler with credentials."""
logger.info( "Loading DataSourceHandler" )
# Load base client
await super ().load(credentials)
# Extract handler-specific configuration
self .api_key = credentials.get( "api_key" )
self .base_url = credentials.get( "base_url" , "https://api.datasource.com" )
if not self .api_key:
raise ValueError ( "API key is required" )
logger.info( "DataSourceHandler loaded successfully" )
async def test_auth ( self , ** kwargs : Any) -> bool :
"""Test API authentication."""
try :
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /v1/auth/verify" ,
headers = { "Authorization" : f "Bearer { self .api_key } " },
timeout = 10
)
if response and response.status_code == 200 :
logger.info( "Authentication test passed" )
return True
logger.error( f "Authentication failed: { response.status_code if response else 'No response' } " )
return False
except Exception as e:
logger.error( f "Authentication error: { e } " , exc_info = True )
return False
async def preflight_check ( self , ** kwargs : Any) -> Dict[ str , Any]:
"""Validate configuration before workflow execution."""
metadata = kwargs.get( "metadata" , {})
results = {}
# Connection check
try :
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /v1/health" ,
headers = { "Authorization" : f "Bearer { self .api_key } " }
)
results[ "connection" ] = {
"success" : response is not None and response.status_code == 200 ,
"message" : "API connection successful" if response and response.status_code == 200 else "Connection failed" ,
"failureMessage" : "Unable to connect to API" if not (response and response.status_code == 200 ) else None
}
except Exception as e:
results[ "connection" ] = {
"success" : False ,
"message" : "Connection error" ,
"failureMessage" : str (e)
}
# Validate source exists
source_id = metadata.get( "source_id" )
if source_id:
try :
source_exists = await self ._verify_source(source_id)
results[ "source_verification" ] = {
"success" : source_exists,
"message" : f "Source { source_id } verified" if source_exists else "Source not found" ,
"failureMessage" : f "Source { source_id } does not exist" if not source_exists else None
}
except Exception as e:
results[ "source_verification" ] = {
"success" : False ,
"message" : "Source verification failed" ,
"failureMessage" : str (e)
}
return results
async def fetch_metadata ( self , ** kwargs : Any) -> Dict[ str , Any]:
"""Fetch metadata from the data source."""
metadata_type = kwargs.get( "metadata_type" , "all" )
database = kwargs.get( "database" , "" )
logger.info( f "Fetching metadata: type= { metadata_type } " )
if metadata_type == "sources" :
return { "sources" : await self ._fetch_sources()}
elif metadata_type == "schemas" :
return { "schemas" : await self ._fetch_schemas(database)}
else :
return {
"sources" : await self ._fetch_sources(),
"schemas" : await self ._fetch_schemas(database) if database else []
}
async def _verify_source ( self , source_id : str ) -> bool :
"""Verify that a source exists."""
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /v1/sources/ { source_id } " ,
headers = { "Authorization" : f "Bearer { self .api_key } " }
)
return response is not None and response.status_code == 200
async def _fetch_sources ( self ) -> List[Dict[ str , Any]]:
"""Fetch available data sources."""
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /v1/sources" ,
headers = { "Authorization" : f "Bearer { self .api_key } " }
)
if response and response.status_code == 200 :
return response.json().get( "sources" , [])
return []
async def _fetch_schemas ( self , database : str ) -> List[Dict[ str , Any]]:
"""Fetch schemas for a database."""
response = await self .client.execute_http_get_request(
url = f " { self .base_url } /v1/schemas" ,
params = { "database" : database},
headers = { "Authorization" : f "Bearer { self .api_key } " }
)
if response and response.status_code == 200 :
return response.json().get( "schemas" , [])
return []
# Custom business logic methods
async def extract_data ( self , source_id : str , query : str ) -> List[Dict[ str , Any]]:
"""Extract data from the source."""
response = await self .client.execute_http_post_request(
url = f " { self .base_url } /v1/query" ,
json_data = {
"source_id" : source_id,
"query" : query
},
headers = { "Authorization" : f "Bearer { self .api_key } " },
timeout = 300
)
if response and response.status_code == 200 :
return response.json().get( "data" , [])
raise Exception ( f "Data extraction failed: { response.status_code if response else 'No response' } " )
FastAPI Endpoint Integration
Handlers are automatically integrated with FastAPI endpoints:
Endpoint Handler Method Purpose POST /workflows/v1/authtest_auth()Validate credentials POST /workflows/v1/metadatafetch_metadata()Get metadata for UI POST /workflows/v1/checkpreflight_check()Pre-execution validation GET /workflows/v1/configmap/{id}get_configmap()Retrieve configuration
Best Practices
Separate Handler and Client Concerns
Keep connection management in the client and business logic in the handler. # ❌ Bad - Connection logic in handler
class BadHandler ( BaseHandler ):
async def fetch_data ( self ):
async with httpx.AsyncClient() as client:
response = await client.get(url)
# ✅ Good - Use client for connections
class GoodHandler ( BaseHandler ):
async def fetch_data ( self ):
response = await self .client.execute_http_get_request(url)
Provide Detailed Preflight Checks
Implement comprehensive preflight checks to catch issues early. async def preflight_check ( self , ** kwargs ):
return {
"connection" : { ... },
"permissions" : { ... },
"source_validation" : { ... },
"destination_validation" : { ... }
}
Always handle exceptions and return meaningful error messages. try :
result = await self ._risky_operation()
except httpx.HTTPError as e:
logger.error( f "HTTP error: { e } " )
return { "success" : False , "failureMessage" : f "API error: { str (e) } " }
except Exception as e:
logger.error( f "Unexpected error: { e } " , exc_info = True )
return { "success" : False , "failureMessage" : "Internal error occurred" }
Clients - Manage connections to external systems
Activities - Call handler methods from workflows
Server - Exposes handler methods via API endpoints