The BaseApplication class provides the foundation for building applications on the Atlan platform, orchestrating workflows, workers, and servers.
BaseApplication
application_sdk.application.BaseApplication
Generic application abstraction for orchestrating workflows, workers, and (optionally) servers.
Constructor
BaseApplication(
name: str,
server: Optional[ServerInterface] = None,
application_manifest: Optional[Dict[str, Any]] = None,
client_class: Optional[Type[BaseClient]] = None,
handler_class: Optional[Type[BaseHandler]] = None
)
The name of the application
server
ServerInterface
default:"None"
The server instance for the application. If not provided, a default server will be created.
application_manifest
Dict[str, Any]
default:"None"
Application manifest configuration for event registration and other settings
client_class
Type[BaseClient]
default:"BaseClient"
Client class for the application to use for external connections
handler_class
Type[BaseHandler]
default:"BaseHandler"
Handler class for processing requests
Methods
setup_workflow
Set up the workflow client and start the worker for the application.
await app.setup_workflow(
workflow_and_activities_classes: List[Tuple[Type[WorkflowInterface], Type[ActivitiesInterface]]],
passthrough_modules: List[str] = [],
activity_executor: Optional[ThreadPoolExecutor] = None
)
workflow_and_activities_classes
List[Tuple[Type[WorkflowInterface], Type[ActivitiesInterface]]]
required
List of tuples pairing workflow classes with their corresponding activities classes
Modules to pass through to the worker
activity_executor
ThreadPoolExecutor
default:"None"
Executor for running activities. If not provided, uses default executor.
start
Start the application based on the configured APPLICATION_MODE.
await app.start(
workflow_class: Type[WorkflowInterface],
ui_enabled: bool = True,
has_configmap: bool = False
)
workflow_class
Type[WorkflowInterface]
required
The workflow class to register with the server
Whether the application has a configmap
Based on APPLICATION_MODE:
- LOCAL: Starts worker in daemon mode and server (for local development)
- WORKER: Starts only the worker in non-daemon mode (for production worker pods)
- SERVER: Starts only the server (for production API server pods)
start_workflow
Start a new workflow execution.
result = await app.start_workflow(
workflow_args: Dict[str, Any],
workflow_class: Type[WorkflowInterface]
)
The arguments for the workflow
workflow_class
Type[WorkflowInterface]
required
The workflow class to execute
The result of the workflow execution
register_event_subscription
Register a workflow class for a specific event subscription.
app.register_event_subscription(
event_id: str,
workflow_class: Type[WorkflowInterface]
)
The event ID from the application manifest
workflow_class
Type[WorkflowInterface]
required
The workflow class to handle this event
Attributes
The name of the application
The server instance for the application
The worker instance for executing workflows
Client for interacting with Temporal workflows
Application manifest configuration
event_subscriptions
Dict[str, EventWorkflowTrigger]
Dictionary of event subscriptions configured for the application
Client class used by the application
Handler class used by the application
Example Usage
Basic Application
from application_sdk.application import BaseApplication
from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface
# Define workflow
class MyWorkflow(WorkflowInterface):
activities_cls = MyActivities
@staticmethod
def get_activities(activities):
return [activities.process_data]
async def run(self, workflow_config):
await super().run(workflow_config)
# Custom workflow logic
result = await workflow.execute_activity_method(
self.activities_cls.process_data,
workflow_args,
start_to_close_timeout=timedelta(minutes=10)
)
return result
# Define activities
class MyActivities(ActivitiesInterface):
async def process_data(self, workflow_args):
state = await self._get_state(workflow_args)
# Process data
return {"status": "success"}
# Create and run application
app = BaseApplication(name="my-data-processor")
await app.setup_workflow(
workflow_and_activities_classes=[
(MyWorkflow, MyActivities)
]
)
await app.start(workflow_class=MyWorkflow)
Application with Custom Client and Handler
from application_sdk.application import BaseApplication
from application_sdk.clients.base import BaseClient
from application_sdk.handlers.base import BaseHandler
class MyClient(BaseClient):
async def load(self, **kwargs):
credentials = kwargs.get("credentials", {})
self.http_headers = {
"Authorization": f"Bearer {credentials.get('token')}"
}
class MyHandler(BaseHandler):
async def preflight_check(self, **kwargs):
# Perform checks
return {"connection": {"success": True}}
async def fetch_metadata(self, **kwargs):
# Fetch metadata
return {"data": []}
app = BaseApplication(
name="my-app",
client_class=MyClient,
handler_class=MyHandler
)
Event-Driven Application
manifest = {
"eventRegistration": {
"consumes": [
{
"event_id": "data-updated",
"event_type": "data.updated",
"event_name": "DataUpdated",
"filters": {"source": "external-system"}
}
]
}
}
app = BaseApplication(
name="event-processor",
application_manifest=manifest
)
# Register workflow for event
app.register_event_subscription(
event_id="data-updated",
workflow_class=DataUpdateWorkflow
)
await app.setup_workflow(
workflow_and_activities_classes=[
(DataUpdateWorkflow, DataUpdateActivities)
]
)
await app.start(workflow_class=DataUpdateWorkflow)
Application Modes
The application runs in different modes based on the APPLICATION_MODE environment variable:
- LOCAL: For local development, runs both worker and server
- WORKER: For production worker pods, runs only the worker
- SERVER: For production API server pods, runs only the server
This separation allows for horizontal scaling in production deployments.