Skip to main content

Overview

The server component provides a FastAPI-based HTTP interface for managing workflows, handling authentication, processing events, and exposing application functionality through REST APIs.
The SDK includes APIServer, a complete FastAPI implementation that handles workflow triggers, authentication, metadata operations, and event processing.

ServerInterface

The base interface that all server implementations must follow.
from abc import ABC, abstractmethod
from typing import Optional
from application_sdk.handlers import HandlerInterface

class ServerInterface(ABC):
    """Abstract base class for API servers."""
    
    handler: Optional[HandlerInterface]
    
    def __init__(self, handler: Optional[HandlerInterface] = None):
        self.handler = handler
    
    @abstractmethod
    async def start(self) -> None:
        """Start the server."""
        raise NotImplementedError("start method not implemented")

APIServer

The APIServer class provides a complete FastAPI-based server implementation with built-in support for workflow management, authentication, and event handling.
from application_sdk.server.fastapi import APIServer
from application_sdk.clients.workflow import WorkflowClient
from my_app.handlers import MyHandler

server = APIServer(
    handler=MyHandler(),
    workflow_client=workflow_client,
    ui_enabled=True,
    has_configmap=True
)

Constructor Parameters

lifespan
Callable
Optional lifespan manager for the FastAPI application. Used for startup/shutdown hooks.
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Server starting")
    yield
    # Shutdown
    print("Server stopping")

server = APIServer(lifespan=lifespan)
handler
HandlerInterface
Handler instance for processing application-specific operations like authentication, preflight checks, and metadata fetching.
workflow_client
WorkflowClient
Client for interacting with Temporal workflows. Required for workflow operations.
frontend_templates_path
str
default:"frontend/templates"
Path to frontend template files for the UI.
ui_enabled
bool
default:"True"
Whether to enable the UI frontend routes.
has_configmap
bool
default:"False"
Whether the application has a configuration map for the UI.
subscriptions
List[Subscription]
default:"[]"
List of Dapr pub/sub subscriptions for event-driven workflows.

Core API Endpoints

The server automatically registers several API endpoints:

Workflow Endpoints (/workflows/v1)

Test authentication credentials.Request:
{
  "api_key": "your-api-key",
  "base_url": "https://api.example.com"
}
Response:
{
  "success": true,
  "message": "Authentication successful"
}
Calls handler.test_auth()
Fetch metadata from the source system.Request:
{
  "type": "databases",
  "database": "production",
  "credentials": {...}
}
Response:
{
  "success": true,
  "data": {
    "databases": [...],
    "tables": [...]
  }
}
Calls handler.fetch_metadata()
Perform preflight validation checks.Request:
{
  "credentials": {...},
  "metadata": {...}
}
Response:
{
  "success": true,
  "data": {
    "connection_check": {
      "success": true,
      "message": "Connection successful"
    },
    "permissions_check": {
      "success": true,
      "message": "Permissions validated"
    }
  }
}
Calls handler.preflight_check()
Start a new workflow execution.Request:
{
  "workflow_id": "my-workflow-123",
  "metadata": {...}
}
Response:
{
  "success": true,
  "message": "Workflow started successfully",
  "data": {
    "workflow_id": "my-workflow-123",
    "run_id": "abc-def-123"
  }
}
Get the status of a workflow run.Response:
{
  "success": true,
  "message": "Workflow status fetched successfully",
  "data": {
    "status": "RUNNING",
    "startTime": "2024-01-15T10:30:00Z",
    "lastUpdated": "2024-01-15T10:35:00Z"
  }
}
Stop a running workflow.Response:
{
  "success": true
}
Retrieve workflow configuration.Response:
{
  "success": true,
  "message": "Workflow configuration fetched successfully",
  "data": {...}
}
Update workflow configuration.Request:
{
  "config": {...}
}
Response:
{
  "success": true,
  "message": "Workflow configuration updated successfully",
  "data": {...}
}
Upload a file to object storage.Form Data:
  • file: File to upload
  • filename: Original filename
  • prefix: Storage prefix (default: “workflow_file_upload”)
  • contentType: File content type
Response:
{
  "success": true,
  "key": "workflow_file_upload/2024/01/15/file.csv",
  "url": "https://storage/..." 
}
Get a configuration map.Response:
{
  "success": true,
  "message": "Configuration map fetched successfully",
  "data": {...}
}
Calls handler.get_configmap()

Event Endpoints (/events/v1)

Trigger a workflow from an event.Request:
{
  "event_type": "ENTITY_UPDATE",
  "event_name": "asset_updated",
  "data": {...}
}
Response:
{
  "success": true,
  "message": "Workflow started successfully",
  "status": "SUCCESS",
  "data": {
    "workflow_id": "...",
    "run_id": "..."
  }
}
Drop events that don’t match filters.Response:
{
  "success": false,
  "message": "Event didn't match any of the filters",
  "status": "DROP"
}

Dapr Endpoints (/dapr)

Get Dapr pub/sub subscription configuration.Response:
[
  {
    "pubsubname": "eventstore",
    "topic": "ENTITY_UPDATE",
    "routes": {
      "rules": [
        {
          "match": "event.data.event_name == 'asset_updated'",
          "path": "/events/v1/event/asset_update_trigger"
        }
      ],
      "default": "/events/v1/drop"
    }
  }
]

Other Endpoints

Launch DuckDB UI for log exploration.Redirects to http://0.0.0.0:4213
Serve the application UI (if ui_enabled=True).

Registering Workflows

Register workflows with the server using different trigger types.

HTTP Trigger

Register a workflow that is triggered via HTTP POST:
from application_sdk.server.fastapi import APIServer, HttpWorkflowTrigger
from my_app.workflows import MyWorkflow

server.register_workflow(
    workflow_class=MyWorkflow,
    triggers=[HttpWorkflowTrigger()]
)
This creates a POST /workflows/v1/start endpoint.

Event Trigger

Register a workflow that is triggered by events:
from application_sdk.server.fastapi import APIServer, EventWorkflowTrigger
from my_app.workflows import AssetUpdateWorkflow

event_trigger = EventWorkflowTrigger(
    event_type="ENTITY_UPDATE",
    event_name="asset_updated",
    event_id="asset_update_handler",
    event_filters=[
        {
            "path": "event.data.typeName",
            "operator": "==",
            "value": "Table"
        }
    ]
)

server.register_workflow(
    workflow_class=AssetUpdateWorkflow,
    triggers=[event_trigger]
)

Multiple Triggers

A workflow can have multiple triggers:
server.register_workflow(
    workflow_class=MyWorkflow,
    triggers=[
        HttpWorkflowTrigger(),
        EventWorkflowTrigger(
            event_type="ENTITY_CREATE",
            event_name="asset_created",
            event_id="asset_create_handler"
        )
    ]
)

Starting the Server

await server.start(
    host="0.0.0.0",
    port=8000,
    root_path=""
)
host
str
default:"0.0.0.0"
Host address to bind to. Use 0.0.0.0 to accept connections from any network interface.
port
int
default:"8000"
Port to listen on. Configurable via APP_PORT environment variable.
root_path
str
default:""
ASGI root path passed to uvicorn. Used when the app is behind a proxy with a path prefix.Example: If your app is served at https://example.com/api/my-app, set root_path="/api/my-app"

Middleware

The server automatically includes middleware for:

LogMiddleware

Logs all incoming requests and responses with timing information.
INFO: POST /workflows/v1/start - 200 OK - 1.234s

MetricsMiddleware

Collects metrics for:
  • Request counts by endpoint and status code
  • Request duration histograms
  • Active request gauges

Documentation

The server can automatically generate and serve Atlan documentation:
server.docs_directory_path = "docs"
server.docs_export_path = "dist"
Documentation is served at /atlandocs

Custom Subscriptions

Register custom Dapr pub/sub subscriptions:
from application_sdk.server.fastapi.models import Subscription

async def handle_custom_event(data: dict):
    print(f"Received event: {data}")
    return {"status": "processed"}

subscription = Subscription(
    component_name="pubsub",
    topic="custom-topic",
    route="custom-handler",
    handler=handle_custom_event
)

server = APIServer(
    handler=handler,
    workflow_client=client,
    subscriptions=[subscription]
)

Complete Server Example

import asyncio
from application_sdk.server.fastapi import APIServer, HttpWorkflowTrigger
from application_sdk.clients.workflow import WorkflowClient
from my_app.workflows import DataPipelineWorkflow
from my_app.handlers import DataHandler
from my_app.clients import DataClient

async def main():
    # Initialize workflow client
    workflow_client = WorkflowClient(application_name="data-pipeline")
    await workflow_client.load()
    
    # Initialize handler with client
    handler = DataHandler(client=DataClient())
    
    # Create server
    server = APIServer(
        handler=handler,
        workflow_client=workflow_client,
        ui_enabled=True,
        has_configmap=True
    )
    
    # Register workflows
    server.register_workflow(
        workflow_class=DataPipelineWorkflow,
        triggers=[HttpWorkflowTrigger()]
    )
    
    # Start server
    await server.start(
        host="0.0.0.0",
        port=8000
    )

if __name__ == "__main__":
    asyncio.run(main())

Instance Attributes

AttributeTypeDescription
appFastAPIThe FastAPI application instance
workflow_clientOptional[WorkflowClient]Client for Temporal operations
handlerOptional[HandlerInterface]Handler for business logic
workflow_routerAPIRouterRouter for workflow endpoints
dapr_routerAPIRouterRouter for Dapr pub/sub
events_routerAPIRouterRouter for event handling
ui_enabledboolWhether UI routes are enabled
has_configmapboolWhether configmap is available
workflowsList[WorkflowInterface]Registered workflows
event_triggersList[EventWorkflowTrigger]Event-based triggers

Environment Variables

The server respects these environment variables:
VariableDescriptionDefault
APP_HOSTServer host address0.0.0.0
APP_PORTServer port8000
APP_TENANT_IDTenant identifier-
WORKFLOW_UI_HOSTWorkflow UI host-
WORKFLOW_UI_PORTWorkflow UI port-

Best Practices

Configure the server using environment variables for different environments.
import os

server = APIServer(
    handler=handler,
    ui_enabled=os.getenv("UI_ENABLED", "true").lower() == "true",
    has_configmap=os.getenv("HAS_CONFIGMAP", "false").lower() == "true"
)

await server.start(
    host=os.getenv("APP_HOST", "0.0.0.0"),
    port=int(os.getenv("APP_PORT", "8000"))
)
Add custom health check endpoints for monitoring.
@server.app.get("/health")
async def health_check():
    return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
Implement lifespan context managers for startup/shutdown tasks.
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Initializing resources...")
    yield
    # Shutdown
    print("Cleaning up resources...")

server = APIServer(lifespan=lifespan)

Build docs developers (and LLMs) love