Skip to main content
The BaseApplication class provides the foundation for building applications on the Atlan platform, orchestrating workflows, workers, and servers.

BaseApplication

application_sdk.application.BaseApplication Generic application abstraction for orchestrating workflows, workers, and (optionally) servers.

Constructor

BaseApplication(
    name: str,
    server: Optional[ServerInterface] = None,
    application_manifest: Optional[Dict[str, Any]] = None,
    client_class: Optional[Type[BaseClient]] = None,
    handler_class: Optional[Type[BaseHandler]] = None
)
name
str
required
The name of the application
server
ServerInterface
default:"None"
The server instance for the application. If not provided, a default server will be created.
application_manifest
Dict[str, Any]
default:"None"
Application manifest configuration for event registration and other settings
client_class
Type[BaseClient]
default:"BaseClient"
Client class for the application to use for external connections
handler_class
Type[BaseHandler]
default:"BaseHandler"
Handler class for processing requests

Methods

setup_workflow

Set up the workflow client and start the worker for the application.
await app.setup_workflow(
    workflow_and_activities_classes: List[Tuple[Type[WorkflowInterface], Type[ActivitiesInterface]]],
    passthrough_modules: List[str] = [],
    activity_executor: Optional[ThreadPoolExecutor] = None
)
workflow_and_activities_classes
List[Tuple[Type[WorkflowInterface], Type[ActivitiesInterface]]]
required
List of tuples pairing workflow classes with their corresponding activities classes
passthrough_modules
List[str]
default:"[]"
Modules to pass through to the worker
activity_executor
ThreadPoolExecutor
default:"None"
Executor for running activities. If not provided, uses default executor.

start

Start the application based on the configured APPLICATION_MODE.
await app.start(
    workflow_class: Type[WorkflowInterface],
    ui_enabled: bool = True,
    has_configmap: bool = False
)
workflow_class
Type[WorkflowInterface]
required
The workflow class to register with the server
ui_enabled
bool
default:"True"
Whether to enable the UI
has_configmap
bool
default:"False"
Whether the application has a configmap
behavior
Description
Based on APPLICATION_MODE:
  • LOCAL: Starts worker in daemon mode and server (for local development)
  • WORKER: Starts only the worker in non-daemon mode (for production worker pods)
  • SERVER: Starts only the server (for production API server pods)

start_workflow

Start a new workflow execution.
result = await app.start_workflow(
    workflow_args: Dict[str, Any],
    workflow_class: Type[WorkflowInterface]
)
workflow_args
Dict[str, Any]
required
The arguments for the workflow
workflow_class
Type[WorkflowInterface]
required
The workflow class to execute
return
Any
The result of the workflow execution

register_event_subscription

Register a workflow class for a specific event subscription.
app.register_event_subscription(
    event_id: str,
    workflow_class: Type[WorkflowInterface]
)
event_id
str
required
The event ID from the application manifest
workflow_class
Type[WorkflowInterface]
required
The workflow class to handle this event

Attributes

application_name
str
The name of the application
server
ServerInterface | None
The server instance for the application
worker
Worker | None
The worker instance for executing workflows
workflow_client
WorkflowClient
Client for interacting with Temporal workflows
application_manifest
Dict[str, Any] | None
Application manifest configuration
event_subscriptions
Dict[str, EventWorkflowTrigger]
Dictionary of event subscriptions configured for the application
client_class
Type[BaseClient]
Client class used by the application
handler_class
Type[BaseHandler]
Handler class used by the application

Example Usage

Basic Application

from application_sdk.application import BaseApplication
from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface

# Define workflow
class MyWorkflow(WorkflowInterface):
    activities_cls = MyActivities
    
    @staticmethod
    def get_activities(activities):
        return [activities.process_data]
    
    async def run(self, workflow_config):
        await super().run(workflow_config)
        # Custom workflow logic
        result = await workflow.execute_activity_method(
            self.activities_cls.process_data,
            workflow_args,
            start_to_close_timeout=timedelta(minutes=10)
        )
        return result

# Define activities
class MyActivities(ActivitiesInterface):
    async def process_data(self, workflow_args):
        state = await self._get_state(workflow_args)
        # Process data
        return {"status": "success"}

# Create and run application
app = BaseApplication(name="my-data-processor")

await app.setup_workflow(
    workflow_and_activities_classes=[
        (MyWorkflow, MyActivities)
    ]
)

await app.start(workflow_class=MyWorkflow)

Application with Custom Client and Handler

from application_sdk.application import BaseApplication
from application_sdk.clients.base import BaseClient
from application_sdk.handlers.base import BaseHandler

class MyClient(BaseClient):
    async def load(self, **kwargs):
        credentials = kwargs.get("credentials", {})
        self.http_headers = {
            "Authorization": f"Bearer {credentials.get('token')}"
        }

class MyHandler(BaseHandler):
    async def preflight_check(self, **kwargs):
        # Perform checks
        return {"connection": {"success": True}}
    
    async def fetch_metadata(self, **kwargs):
        # Fetch metadata
        return {"data": []}

app = BaseApplication(
    name="my-app",
    client_class=MyClient,
    handler_class=MyHandler
)

Event-Driven Application

manifest = {
    "eventRegistration": {
        "consumes": [
            {
                "event_id": "data-updated",
                "event_type": "data.updated",
                "event_name": "DataUpdated",
                "filters": {"source": "external-system"}
            }
        ]
    }
}

app = BaseApplication(
    name="event-processor",
    application_manifest=manifest
)

# Register workflow for event
app.register_event_subscription(
    event_id="data-updated",
    workflow_class=DataUpdateWorkflow
)

await app.setup_workflow(
    workflow_and_activities_classes=[
        (DataUpdateWorkflow, DataUpdateActivities)
    ]
)

await app.start(workflow_class=DataUpdateWorkflow)

Application Modes

The application runs in different modes based on the APPLICATION_MODE environment variable:
  • LOCAL: For local development, runs both worker and server
  • WORKER: For production worker pods, runs only the worker
  • SERVER: For production API server pods, runs only the server
This separation allows for horizontal scaling in production deployments.

Build docs developers (and LLMs) love