Handlers provide the business logic layer between workflows and external systems, coordinating client operations and data processing.
HandlerInterface
application_sdk.handlers.HandlerInterface
Abstract base class for workflow handlers.
Methods
load
Initialize and load the handler.
async def load(*args: Any, **kwargs: Any) -> None
Variable positional arguments
Variable keyword arguments, typically including credentials
Must be implemented by subclasses
test_auth
Test the authentication credentials.
async def test_auth(*args: Any, **kwargs: Any) -> bool
Variable positional arguments
Variable keyword arguments
True if authentication is successful, False otherwise
If not implemented by subclass
preflight_check
Perform preflight checks before workflow execution.
async def preflight_check(*args: Any, **kwargs: Any) -> Any
Variable positional arguments
Variable keyword arguments, typically including metadata
Dictionary with check results, typically containing success/failure information
If not implemented by subclass
Fetch metadata from the data source.
async def fetch_metadata(*args: Any, **kwargs: Any) -> Any
Variable positional arguments
Variable keyword arguments
If not implemented by subclass
get_configmap
Static method to get configuration map.
@staticmethod
async def get_configmap(config_map_id: str) -> Dict[str, Any]
The configuration map identifier
BaseHandler
application_sdk.handlers.base.BaseHandler
Base handler implementation for non-SQL based applications.
Constructor
BaseHandler(client: Optional[BaseClient] = None)
client
BaseClient
default:"BaseClient()"
The client instance to use for connections
Attributes
The client instance for connecting to the target system
Methods
load
Load and initialize the handler.
async def load(credentials: Dict[str, Any]) -> None
Credentials for the client
Example Usage
Basic Handler
from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.base import BaseClient
from typing import Dict, Any
class MyClient(BaseClient):
async def load(self, **kwargs):
credentials = kwargs.get("credentials", {})
self.api_key = credentials.get("api_key")
self.http_headers = {
"Authorization": f"Bearer {self.api_key}"
}
async def fetch_data(self, endpoint: str):
response = await self.execute_http_get_request(
url=f"https://api.example.com/{endpoint}",
)
return response.json() if response else None
class MyHandler(BaseHandler):
def __init__(self):
super().__init__(client=MyClient())
async def test_auth(self, **kwargs) -> bool:
"""Test API authentication."""
try:
result = await self.client.fetch_data("auth/test")
return result is not None
except Exception as e:
logger.error(f"Authentication failed: {e}")
return False
async def preflight_check(self, **kwargs) -> Dict[str, Any]:
"""Perform preflight checks."""
metadata = kwargs.get("metadata", {})
checks = {
"authentication": {
"success": False,
"failureMessage": ""
},
"connectivity": {
"success": False,
"failureMessage": ""
}
}
# Test authentication
try:
checks["authentication"]["success"] = await self.test_auth()
if not checks["authentication"]["success"]:
checks["authentication"]["failureMessage"] = "Invalid credentials"
except Exception as e:
checks["authentication"]["failureMessage"] = str(e)
# Test connectivity
try:
result = await self.client.fetch_data("health")
checks["connectivity"]["success"] = result is not None
if not checks["connectivity"]["success"]:
checks["connectivity"]["failureMessage"] = "Service unavailable"
except Exception as e:
checks["connectivity"]["failureMessage"] = str(e)
return checks
async def fetch_metadata(self, **kwargs) -> Dict[str, Any]:
"""Fetch metadata from the API."""
metadata = kwargs.get("metadata", {})
dataset = metadata.get("dataset")
data = await self.client.fetch_data(f"datasets/{dataset}/metadata")
return {
"dataset": dataset,
"metadata": data,
"count": len(data) if data else 0
}
Database Handler
from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.sql import SQLClient
from typing import Dict, Any, List
class DatabaseHandler(BaseHandler):
def __init__(self):
self.sql_client = SQLClient()
super().__init__(client=self.sql_client)
async def load(self, credentials: Dict[str, Any]) -> None:
"""Load handler with database credentials."""
await self.sql_client.load(credentials=credentials)
async def test_auth(self, **kwargs) -> bool:
"""Test database connection."""
try:
query = "SELECT 1"
result = await self.sql_client.execute_query(query)
return result is not None
except Exception as e:
logger.error(f"Database connection test failed: {e}")
return False
async def preflight_check(self, **kwargs) -> Dict[str, Any]:
"""Check database connection and permissions."""
checks = {
"connection": {"success": False, "failureMessage": ""},
"permissions": {"success": False, "failureMessage": ""}
}
# Test connection
try:
checks["connection"]["success"] = await self.test_auth()
if not checks["connection"]["success"]:
checks["connection"]["failureMessage"] = "Cannot connect to database"
except Exception as e:
checks["connection"]["failureMessage"] = str(e)
# Test permissions
if checks["connection"]["success"]:
try:
# Check if we can list tables
query = "SHOW TABLES"
result = await self.sql_client.execute_query(query)
checks["permissions"]["success"] = result is not None
if not checks["permissions"]["success"]:
checks["permissions"]["failureMessage"] = "Insufficient permissions"
except Exception as e:
checks["permissions"]["failureMessage"] = str(e)
return checks
async def fetch_metadata(self, **kwargs) -> Dict[str, Any]:
"""Fetch database schema metadata."""
metadata = kwargs.get("metadata", {})
schema = metadata.get("schema")
# Get tables
tables_query = f"SHOW TABLES FROM {schema}"
tables = await self.sql_client.execute_query(tables_query)
result = {
"schema": schema,
"tables": [],
"total_tables": 0
}
if tables:
for table in tables:
table_name = table[0]
# Get columns for each table
columns_query = f"DESCRIBE {schema}.{table_name}"
columns = await self.sql_client.execute_query(columns_query)
result["tables"].append({
"name": table_name,
"columns": columns
})
result["total_tables"] = len(result["tables"])
return result
Handler with Configmap
from application_sdk.handlers.base import BaseHandler
from application_sdk.services.statestore import StateStore, StateType
from typing import Dict, Any
class ConfigurableHandler(BaseHandler):
async def load(self, credentials: Dict[str, Any]) -> None:
"""Load handler with credentials and config."""
await super().load(credentials)
# Load configmap if available
config_map_id = credentials.get("config_map_id")
if config_map_id:
self.config = await self.get_configmap(config_map_id)
else:
self.config = {}
@staticmethod
async def get_configmap(config_map_id: str) -> Dict[str, Any]:
"""Get configuration map from state store."""
try:
config = await StateStore.get_state(
config_map_id,
StateType.WORKFLOWS
)
return config
except Exception as e:
logger.warning(f"Failed to load configmap: {e}")
return {}
async def preflight_check(self, **kwargs) -> Dict[str, Any]:
"""Perform checks using config."""
checks = {
"config": {"success": False, "failureMessage": ""}
}
# Validate config
required_keys = ["api_endpoint", "timeout"]
missing = [k for k in required_keys if k not in self.config]
if missing:
checks["config"]["failureMessage"] = f"Missing config keys: {missing}"
else:
checks["config"]["success"] = True
return checks
async def fetch_metadata(self, **kwargs) -> Dict[str, Any]:
"""Fetch metadata using configured endpoint."""
endpoint = self.config.get("api_endpoint")
timeout = self.config.get("timeout", 30)
response = await self.client.execute_http_get_request(
url=endpoint,
timeout=timeout
)
return response.json() if response else {}
Multi-Source Handler
from application_sdk.handlers.base import BaseHandler
from typing import Dict, Any, List
import asyncio
class MultiSourceHandler(BaseHandler):
async def fetch_metadata(self, **kwargs) -> Dict[str, Any]:
"""Fetch metadata from multiple sources in parallel."""
metadata = kwargs.get("metadata", {})
sources = metadata.get("sources", [])
# Fetch from all sources in parallel
tasks = [
self._fetch_from_source(source)
for source in sources
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Aggregate results
aggregated = {
"sources": [],
"total_records": 0,
"errors": []
}
for source, result in zip(sources, results):
if isinstance(result, Exception):
aggregated["errors"].append({
"source": source,
"error": str(result)
})
else:
aggregated["sources"].append(result)
aggregated["total_records"] += result.get("count", 0)
return aggregated
async def _fetch_from_source(self, source: str) -> Dict[str, Any]:
"""Fetch data from a single source."""
response = await self.client.execute_http_get_request(
url=f"https://api.example.com/{source}"
)
data = response.json() if response else []
return {
"source": source,
"data": data,
"count": len(data)
}
Best Practices
Handler Design
- Keep handlers focused on coordination, not implementation details
- Use clients for actual API/database operations
- Implement all HandlerInterface methods
- Return structured results from preflight checks
Error Handling
- Always implement try/except blocks in handler methods
- Return structured error information in preflight checks
- Log errors with appropriate context
- Don’t let exceptions propagate unhandled
Client Management
- Initialize clients in the constructor
- Load clients with credentials in the load() method
- Close client connections when done
- Reuse clients across handler methods
Configuration
- Use configmaps for environment-specific settings
- Validate configuration in preflight_check
- Provide sensible defaults
- Document required configuration keys