Skip to main content

Overview

The Worker class manages Temporal workflow workers, handling the execution of workflows and activities with support for graceful shutdown, concurrent execution limits, and custom module sandboxing.
Workers poll Temporal for tasks and execute workflow and activity code. In production, you typically run workers separately from API servers for better scalability.

Key Features

  • Graceful Shutdown: Responds to SIGTERM/SIGINT signals, allowing in-flight activities to complete
  • Concurrency Control: Configurable maximum concurrent activities
  • Custom Executors: Support for custom thread pool executors
  • Module Sandboxing: Control which modules are available in workflow execution
  • Daemon Mode: Run workers in background threads for local development

Initialization

from application_sdk.worker import Worker
from application_sdk.clients.workflow import WorkflowClient
from concurrent.futures import ThreadPoolExecutor

worker = Worker(
    workflow_client=workflow_client,
    workflow_activities=[activity1, activity2],
    workflow_classes=[MyWorkflow],
    passthrough_modules=["my_app", "pandas"],
    max_concurrent_activities=10,
    activity_executor=ThreadPoolExecutor(max_workers=10)
)

Constructor Parameters

workflow_client
WorkflowClient
required
Client for interacting with Temporal. Provides connection details and task queue configuration.
workflow_activities
Sequence[CallableType]
default:"[]"
List of activity functions that this worker can execute.
workflow_activities = [
    activities.extract_data,
    activities.transform_data,
    activities.load_data
]
workflow_classes
Sequence[ClassType]
default:"[]"
List of workflow classes that this worker can execute.
workflow_classes = [DataPipelineWorkflow, ValidationWorkflow]
passthrough_modules
List[str]
default:"[]"
Additional Python modules to make available in the workflow sandbox.Default modules always available: ["application_sdk", "pandas", "os", "app"]
passthrough_modules = ["my_app", "numpy", "requests"]
max_concurrent_activities
int
default:"MAX_CONCURRENT_ACTIVITIES"
Maximum number of activities that can run concurrently.Set to None for no limit. Configurable via MAX_CONCURRENT_ACTIVITIES environment variable.
activity_executor
ThreadPoolExecutor
default:"None"
Custom thread pool executor for running activities.If not provided, a default ThreadPoolExecutor with max_concurrent_activities workers is created.
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(
    max_workers=20,
    thread_name_prefix="activity-"
)
worker = Worker(activity_executor=executor)

Starting the Worker

Production Mode

Run the worker in the foreground (non-daemon mode). This is the standard mode for production worker pods.
await worker.start(daemon=False)
In production, use daemon=False so the worker runs in the main thread and responds to shutdown signals.

Development Mode

Run the worker in daemon mode for local development. This starts the worker in a background thread.
await worker.start(daemon=True)
Daemon mode is only recommended for local development. Production deployments should use daemon=False.

start() Method Parameters

daemon
bool
default:"True"
Whether to run the worker in daemon mode (background thread).
  • True: Worker runs in background thread (local development)
  • False: Worker runs in foreground (production)

Graceful Shutdown

Workers support graceful shutdown to ensure in-flight activities complete before termination.

How It Works

  1. Worker receives SIGTERM or SIGINT signal
  2. Worker stops polling for new tasks
  3. In-flight activities continue executing
  4. Worker waits for activities to complete (up to GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS)
  5. Worker exits when all activities finish or timeout is reached
import signal
import asyncio

# Worker automatically handles SIGTERM/SIGINT
await worker.start(daemon=False)

# To manually trigger shutdown:
# kill -TERM <pid>
Graceful shutdown timeout is configurable via the GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS environment variable (default: 30 seconds).

Platform Support

Full signal handling support via asyncio event loop.Workers respond to:
  • SIGTERM: Graceful shutdown
  • SIGINT: Graceful shutdown (Ctrl+C)
Signal handling is not supported on Windows.
Workers on Windows will not respond to SIGTERM/SIGINT for graceful shutdown. They will continue running until the process is forcefully terminated.For production deployments, use Unix-based systems.

Complete Worker Example

import asyncio
from application_sdk.worker import Worker
from application_sdk.clients.workflow import WorkflowClient
from application_sdk.clients.utils import get_workflow_client
from concurrent.futures import ThreadPoolExecutor
from my_app.workflows import DataPipelineWorkflow
from my_app.activities import DataPipelineActivities

async def main():
    # Initialize workflow client
    workflow_client = get_workflow_client(application_name="data-pipeline")
    await workflow_client.load()
    
    # Get activities
    activities_instance = DataPipelineActivities()
    activities_list = DataPipelineWorkflow.get_activities(activities_instance)
    
    # Create custom executor
    executor = ThreadPoolExecutor(
        max_workers=15,
        thread_name_prefix="pipeline-activity-"
    )
    
    # Initialize worker
    worker = Worker(
        workflow_client=workflow_client,
        workflow_activities=activities_list,
        workflow_classes=[DataPipelineWorkflow],
        passthrough_modules=["my_app", "pandas", "numpy"],
        max_concurrent_activities=15,
        activity_executor=executor
    )
    
    # Start worker (blocks until shutdown)
    print("Starting worker...")
    await worker.start(daemon=False)
    print("Worker stopped")

if __name__ == "__main__":
    asyncio.run(main())

Worker with Application

Typically, workers are managed by the BaseApplication class:
from application_sdk.application import BaseApplication
from my_app.workflows import MyWorkflow
from my_app.activities import MyActivities

app = BaseApplication(name="my-app")

# Set up workflow and activities
await app.setup_workflow(
    workflow_and_activities_classes=[
        (MyWorkflow, MyActivities)
    ],
    passthrough_modules=["my_app"]
)

# Start based on APPLICATION_MODE
await app.start(
    workflow_class=MyWorkflow,
    ui_enabled=True
)

Environment Variables

Workers respect these environment variables:
VariableDescriptionDefault
APPLICATION_MODEDeployment mode (LOCAL, WORKER, SERVER)LOCAL
MAX_CONCURRENT_ACTIVITIESMax concurrent activitiesSystem default
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDSShutdown timeout in seconds30
DEPLOYMENT_NAMEDeployment identifier-

Event Loop Optimization

The worker automatically selects the best event loop implementation:

Unix/Linux/macOS

Uses uvloop for better performance (if installed):
pip install uvloop

Windows

Uses WindowsSelectorEventLoopPolicy for compatibility.
Install uvloop for production deployments on Linux for ~2-4x performance improvement:
pip install uvloop

Worker Lifecycle

Best Practices

Configure max_concurrent_activities based on your resource constraints.
# For CPU-intensive activities
max_concurrent_activities = os.cpu_count()

# For I/O-intensive activities
max_concurrent_activities = os.cpu_count() * 4

# For mixed workloads
max_concurrent_activities = os.cpu_count() * 2
Run workers in separate pods/containers from API servers.
# Worker pod
export APPLICATION_MODE=WORKER
python main.py

# Server pod
export APPLICATION_MODE=SERVER
python main.py
Add all modules your workflows and activities need.
passthrough_modules = [
    "my_app",           # Your application code
    "pandas",           # Data processing
    "numpy",            # Numerical operations
    "requests",         # HTTP requests
    "sqlalchemy"        # Database access
]
Implement health checks and monitoring for production workers.
# Publish worker start event
from application_sdk.services.eventstore import EventStore

worker_event = Event(
    event_type="APPLICATION_EVENT",
    event_name="WORKER_START",
    data={
        "task_queue": workflow_client.worker_task_queue,
        "max_concurrent_activities": max_concurrent_activities
    }
)
await EventStore.publish_event(worker_event)
Ensure activities can complete within the shutdown timeout.
# Set appropriate timeout
export GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS=60

# Design activities to be interruptible
@activity.defn
async def long_running_activity(args):
    for chunk in data_chunks:
        # Check for cancellation
        if activity.is_cancelled():
            logger.info("Activity cancelled, cleaning up...")
            await cleanup()
            raise activity.CancelledError()
        
        process_chunk(chunk)
        activity.heartbeat()

Instance Attributes

AttributeTypeDescription
workflow_clientOptional[WorkflowClient]Client for Temporal communication
workflow_workerOptional[TemporalWorker]Temporal worker instance
workflow_activitiesSequence[CallableType]Registered activity functions
workflow_classesSequence[ClassType]Registered workflow classes
passthrough_modulesList[str]Modules available in workflow sandbox
max_concurrent_activitiesOptional[int]Max concurrent activity executions
activity_executorThreadPoolExecutorExecutor for running activities

Build docs developers (and LLMs) love