Overview
While the SDK provides built-in workflow execution via Temporal, you often need to expose your workflows through HTTP APIs. This example demonstrates how to:
- Create a FastAPI server that wraps your application
- Register workflows with HTTP triggers
- Add custom routes and endpoints
- Integrate with handlers for validation and metadata fetching
This pattern is essential for building applications that need to be triggered via REST APIs or webhooks.
What You’ll Build
A FastAPI application that:
- Exposes a custom
/custom/test endpoint
- Registers a workflow that can be triggered via HTTP POST
- Integrates with a handler for authentication and metadata operations
- Provides a complete REST API for your workflow operations
The FastAPI integration is ideal for building webhook receivers, admin dashboards, or any application that needs HTTP-based workflow triggering.
Complete Code
application_custom_fastapi.py
import asyncio
import uuid
from typing import Any, Dict
from fastapi import APIRouter
from application_sdk.handlers import HandlerInterface
from application_sdk.server.fastapi import APIServer, HttpWorkflowTrigger
from application_sdk.workflows import WorkflowInterface
class CustomHandler(HandlerInterface):
async def load(self, **kwargs: Any) -> None:
pass
async def test_auth(self, **kwargs: Any) -> bool:
return True
async def fetch_metadata(self, **kwargs: Any) -> Any:
return [{"database": "test", "schema": "test"}]
async def preflight_check(self, **kwargs: Any) -> Any:
return {"databaseSchemaCheck": ["test"], "tablesCheck": ["test"]}
class SampleWorkflow(WorkflowInterface):
async def start(
self, workflow_args: Dict[str, Any], workflow_class: Any
) -> Dict[str, str]:
return {
"workflow_id": str(uuid.uuid4()),
"run_id": str(uuid.uuid4()),
}
async def run(self, workflow_config: Dict[str, Any]) -> None:
pass
class MyCustomApplication(APIServer):
custom_router: APIRouter = APIRouter()
def register_routers(self):
self.app.include_router(self.custom_router, prefix="/custom")
super().register_routers()
def register_routes(self):
self.custom_router.add_api_route(
"/test",
self.test,
methods=["GET"],
)
super().register_routes()
async def test(self, **kwargs: Dict[str, Any]) -> Dict[str, str]:
return {"message": "Hello, World!"}
async def application_custom_fastapi():
fast_api_app = MyCustomApplication(handler=CustomHandler())
fast_api_app.register_workflow(
SampleWorkflow,
[
HttpWorkflowTrigger(
endpoint="/sample",
methods=["POST"],
workflow_class=SampleWorkflow,
)
],
)
await fast_api_app.start()
if __name__ == "__main__":
asyncio.run(application_custom_fastapi())
How to Run
Start Dependencies
Start the Dapr runtime and Temporal server: Run the FastAPI Server
Execute the custom FastAPI application:uv run examples/application_custom_fastapi.py
The server will start on http://localhost:8000 by default. Test Custom Endpoint
Test the custom endpoint:curl http://localhost:8000/custom/test
You should receive:{"message": "Hello, World!"}
Trigger Workflow via HTTP
Trigger the sample workflow:curl -X POST http://localhost:8000/sample \
-H "Content-Type: application/json" \
-d '{"workflow_id": "test-workflow"}'
You’ll receive a response with the workflow and run IDs.
Key Components Explained
Custom Handler
class CustomHandler(HandlerInterface):
async def load(self, **kwargs: Any) -> None:
pass
async def test_auth(self, **kwargs: Any) -> bool:
return True
async def fetch_metadata(self, **kwargs: Any) -> Any:
return [{"database": "test", "schema": "test"}]
async def preflight_check(self, **kwargs: Any) -> Any:
return {"databaseSchemaCheck": ["test"], "tablesCheck": ["test"]}
The handler provides core operations:
- load: Initialize any required resources
- test_auth: Validate credentials (returns True if authentication succeeds)
- fetch_metadata: Retrieve metadata about available resources
- preflight_check: Validate that required resources exist before starting the workflow
In a real application, implement test_auth to validate database credentials or API tokens, and fetch_metadata to return actual metadata about available databases/schemas.
Workflow Interface
class SampleWorkflow(WorkflowInterface):
async def start(
self, workflow_args: Dict[str, Any], workflow_class: Any
) -> Dict[str, str]:
return {
"workflow_id": str(uuid.uuid4()),
"run_id": str(uuid.uuid4()),
}
async def run(self, workflow_config: Dict[str, Any]) -> None:
pass
The WorkflowInterface requires:
- start: Initialize and return identifiers for the workflow execution
- run: Execute the actual workflow logic
Custom FastAPI Application
class MyCustomApplication(APIServer):
custom_router: APIRouter = APIRouter()
def register_routers(self):
self.app.include_router(self.custom_router, prefix="/custom")
super().register_routers()
def register_routes(self):
self.custom_router.add_api_route(
"/test",
self.test,
methods=["GET"],
)
super().register_routes()
async def test(self, **kwargs: Dict[str, Any]) -> Dict[str, str]:
return {"message": "Hello, World!"}
Extend APIServer to add custom routes:
- Create a custom
APIRouter instance
- Override
register_routers() to include your router with a prefix
- Override
register_routes() to add individual route handlers
- Call
super() methods to preserve built-in routes
Always call super().register_routers() and super().register_routes() to ensure the default routes (health checks, workflow triggers) are registered.
HTTP Workflow Triggers
fast_api_app.register_workflow(
SampleWorkflow,
[
HttpWorkflowTrigger(
endpoint="/sample",
methods=["POST"],
workflow_class=SampleWorkflow,
)
],
)
Register workflows with HTTP triggers:
- endpoint: The URL path where the workflow can be triggered
- methods: HTTP methods (typically POST for workflow execution)
- workflow_class: The workflow class to execute
When a request hits the endpoint, the SDK automatically:
- Validates the request payload
- Extracts workflow arguments
- Starts the workflow execution
- Returns the workflow ID and run ID
Advanced Patterns
Adding Authentication
from fastapi import Depends, HTTPException, Header
async def verify_token(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid token")
# Validate token
return authorization
class MyCustomApplication(APIServer):
def register_routes(self):
self.custom_router.add_api_route(
"/secure",
self.secure_endpoint,
methods=["GET"],
dependencies=[Depends(verify_token)],
)
Request Validation
from pydantic import BaseModel
class WorkflowRequest(BaseModel):
database: str
schema: str
credentials: Dict[str, str]
class MyCustomApplication(APIServer):
async def start_extraction(self, request: WorkflowRequest):
# Request is automatically validated
return await self.start_workflow(
workflow_args=request.dict(),
workflow_class=SampleWorkflow
)
from fastapi.responses import JSONResponse
class MyCustomApplication(APIServer):
async def custom_response(self) -> JSONResponse:
return JSONResponse(
content={"status": "success", "data": {"result": "value"}},
status_code=200,
headers={"X-Custom-Header": "value"}
)
Adding Middleware
from fastapi.middleware.cors import CORSMiddleware
class MyCustomApplication(APIServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
API Documentation
FastAPI automatically generates interactive API documentation:
- Swagger UI:
http://localhost:8000/docs
- ReDoc:
http://localhost:8000/redoc
- OpenAPI JSON:
http://localhost:8000/openapi.json
Use the Swagger UI to test your endpoints interactively during development. It provides a built-in API testing interface.
Production Considerations
Running with Uvicorn
uvicorn application_custom_fastapi:fast_api_app --host 0.0.0.0 --port 8000 --workers 4
Environment-Based Configuration
import os
class MyCustomApplication(APIServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.debug = os.getenv("DEBUG", "false").lower() == "true"
self.port = int(os.getenv("PORT", "8000"))
Health Checks
The SDK provides built-in health check endpoints:
GET /health: Basic health check
GET /ready: Readiness probe (checks workflow worker status)
Next Steps