Skip to main content

Overview

While the SDK provides built-in workflow execution via Temporal, you often need to expose your workflows through HTTP APIs. This example demonstrates how to:
  • Create a FastAPI server that wraps your application
  • Register workflows with HTTP triggers
  • Add custom routes and endpoints
  • Integrate with handlers for validation and metadata fetching
This pattern is essential for building applications that need to be triggered via REST APIs or webhooks.

What You’ll Build

A FastAPI application that:
  1. Exposes a custom /custom/test endpoint
  2. Registers a workflow that can be triggered via HTTP POST
  3. Integrates with a handler for authentication and metadata operations
  4. Provides a complete REST API for your workflow operations
The FastAPI integration is ideal for building webhook receivers, admin dashboards, or any application that needs HTTP-based workflow triggering.

Complete Code

application_custom_fastapi.py
import asyncio
import uuid
from typing import Any, Dict

from fastapi import APIRouter

from application_sdk.handlers import HandlerInterface
from application_sdk.server.fastapi import APIServer, HttpWorkflowTrigger
from application_sdk.workflows import WorkflowInterface


class CustomHandler(HandlerInterface):
    async def load(self, **kwargs: Any) -> None:
        pass

    async def test_auth(self, **kwargs: Any) -> bool:
        return True

    async def fetch_metadata(self, **kwargs: Any) -> Any:
        return [{"database": "test", "schema": "test"}]

    async def preflight_check(self, **kwargs: Any) -> Any:
        return {"databaseSchemaCheck": ["test"], "tablesCheck": ["test"]}


class SampleWorkflow(WorkflowInterface):
    async def start(
        self, workflow_args: Dict[str, Any], workflow_class: Any
    ) -> Dict[str, str]:
        return {
            "workflow_id": str(uuid.uuid4()),
            "run_id": str(uuid.uuid4()),
        }

    async def run(self, workflow_config: Dict[str, Any]) -> None:
        pass


class MyCustomApplication(APIServer):
    custom_router: APIRouter = APIRouter()

    def register_routers(self):
        self.app.include_router(self.custom_router, prefix="/custom")
        super().register_routers()

    def register_routes(self):
        self.custom_router.add_api_route(
            "/test",
            self.test,
            methods=["GET"],
        )

        super().register_routes()

    async def test(self, **kwargs: Dict[str, Any]) -> Dict[str, str]:
        return {"message": "Hello, World!"}


async def application_custom_fastapi():
    fast_api_app = MyCustomApplication(handler=CustomHandler())
    fast_api_app.register_workflow(
        SampleWorkflow,
        [
            HttpWorkflowTrigger(
                endpoint="/sample",
                methods=["POST"],
                workflow_class=SampleWorkflow,
            )
        ],
    )

    await fast_api_app.start()


if __name__ == "__main__":
    asyncio.run(application_custom_fastapi())

How to Run

1

Start Dependencies

Start the Dapr runtime and Temporal server:
uv run poe start-deps
2

Run the FastAPI Server

Execute the custom FastAPI application:
uv run examples/application_custom_fastapi.py
The server will start on http://localhost:8000 by default.
3

Test Custom Endpoint

Test the custom endpoint:
curl http://localhost:8000/custom/test
You should receive:
{"message": "Hello, World!"}
4

Trigger Workflow via HTTP

Trigger the sample workflow:
curl -X POST http://localhost:8000/sample \
  -H "Content-Type: application/json" \
  -d '{"workflow_id": "test-workflow"}'
You’ll receive a response with the workflow and run IDs.

Key Components Explained

Custom Handler

class CustomHandler(HandlerInterface):
    async def load(self, **kwargs: Any) -> None:
        pass

    async def test_auth(self, **kwargs: Any) -> bool:
        return True

    async def fetch_metadata(self, **kwargs: Any) -> Any:
        return [{"database": "test", "schema": "test"}]

    async def preflight_check(self, **kwargs: Any) -> Any:
        return {"databaseSchemaCheck": ["test"], "tablesCheck": ["test"]}
The handler provides core operations:
  • load: Initialize any required resources
  • test_auth: Validate credentials (returns True if authentication succeeds)
  • fetch_metadata: Retrieve metadata about available resources
  • preflight_check: Validate that required resources exist before starting the workflow
In a real application, implement test_auth to validate database credentials or API tokens, and fetch_metadata to return actual metadata about available databases/schemas.

Workflow Interface

class SampleWorkflow(WorkflowInterface):
    async def start(
        self, workflow_args: Dict[str, Any], workflow_class: Any
    ) -> Dict[str, str]:
        return {
            "workflow_id": str(uuid.uuid4()),
            "run_id": str(uuid.uuid4()),
        }

    async def run(self, workflow_config: Dict[str, Any]) -> None:
        pass
The WorkflowInterface requires:
  • start: Initialize and return identifiers for the workflow execution
  • run: Execute the actual workflow logic

Custom FastAPI Application

class MyCustomApplication(APIServer):
    custom_router: APIRouter = APIRouter()

    def register_routers(self):
        self.app.include_router(self.custom_router, prefix="/custom")
        super().register_routers()

    def register_routes(self):
        self.custom_router.add_api_route(
            "/test",
            self.test,
            methods=["GET"],
        )
        super().register_routes()

    async def test(self, **kwargs: Dict[str, Any]) -> Dict[str, str]:
        return {"message": "Hello, World!"}
Extend APIServer to add custom routes:
  1. Create a custom APIRouter instance
  2. Override register_routers() to include your router with a prefix
  3. Override register_routes() to add individual route handlers
  4. Call super() methods to preserve built-in routes
Always call super().register_routers() and super().register_routes() to ensure the default routes (health checks, workflow triggers) are registered.

HTTP Workflow Triggers

fast_api_app.register_workflow(
    SampleWorkflow,
    [
        HttpWorkflowTrigger(
            endpoint="/sample",
            methods=["POST"],
            workflow_class=SampleWorkflow,
        )
    ],
)
Register workflows with HTTP triggers:
  • endpoint: The URL path where the workflow can be triggered
  • methods: HTTP methods (typically POST for workflow execution)
  • workflow_class: The workflow class to execute
When a request hits the endpoint, the SDK automatically:
  1. Validates the request payload
  2. Extracts workflow arguments
  3. Starts the workflow execution
  4. Returns the workflow ID and run ID

Advanced Patterns

Adding Authentication

from fastapi import Depends, HTTPException, Header

async def verify_token(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid token")
    # Validate token
    return authorization

class MyCustomApplication(APIServer):
    def register_routes(self):
        self.custom_router.add_api_route(
            "/secure",
            self.secure_endpoint,
            methods=["GET"],
            dependencies=[Depends(verify_token)],
        )

Request Validation

from pydantic import BaseModel

class WorkflowRequest(BaseModel):
    database: str
    schema: str
    credentials: Dict[str, str]

class MyCustomApplication(APIServer):
    async def start_extraction(self, request: WorkflowRequest):
        # Request is automatically validated
        return await self.start_workflow(
            workflow_args=request.dict(),
            workflow_class=SampleWorkflow
        )

Custom Response Formatting

from fastapi.responses import JSONResponse

class MyCustomApplication(APIServer):
    async def custom_response(self) -> JSONResponse:
        return JSONResponse(
            content={"status": "success", "data": {"result": "value"}},
            status_code=200,
            headers={"X-Custom-Header": "value"}
        )

Adding Middleware

from fastapi.middleware.cors import CORSMiddleware

class MyCustomApplication(APIServer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        self.app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )

API Documentation

FastAPI automatically generates interactive API documentation:
  • Swagger UI: http://localhost:8000/docs
  • ReDoc: http://localhost:8000/redoc
  • OpenAPI JSON: http://localhost:8000/openapi.json
Use the Swagger UI to test your endpoints interactively during development. It provides a built-in API testing interface.

Production Considerations

Running with Uvicorn

uvicorn application_custom_fastapi:fast_api_app --host 0.0.0.0 --port 8000 --workers 4

Environment-Based Configuration

import os

class MyCustomApplication(APIServer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.debug = os.getenv("DEBUG", "false").lower() == "true"
        self.port = int(os.getenv("PORT", "8000"))

Health Checks

The SDK provides built-in health check endpoints:
  • GET /health: Basic health check
  • GET /ready: Readiness probe (checks workflow worker status)

Next Steps

Build docs developers (and LLMs) love