Skip to main content
The StateStore service provides persistent storage for workflow state and credentials.

StateStore

application_sdk.services.statestore.StateStore Unified state store service for handling state management.

Class Methods

get_state

Get state from the store.
from application_sdk.services.statestore import StateStore, StateType

state = await StateStore.get_state(
    id: str,
    type: StateType
)
id
str
required
The unique identifier to retrieve the state for
type
StateType
required
The type of state to retrieve (StateType.WORKFLOWS or StateType.CREDENTIALS)
return
Dict[str, Any]
The retrieved state data. Returns empty dict if no state found.
raises
IOError
If there’s an error with the object store operations

save_state

Save a single state value to the store.
await StateStore.save_state(
    key: str,
    value: Any,
    id: str,
    type: StateType
)
key
str
required
The key to store the state value under
value
Any
required
The value to store (can be any JSON-serializable type)
id
str
required
The unique identifier for the state object
type
StateType
required
The type of state (StateType.WORKFLOWS or StateType.CREDENTIALS)
raises
Exception
If there’s an error with the object store operations
This method updates a specific key within the state object, merging with existing state.

save_state_object

Save the entire state object to the object store.
updated_state = await StateStore.save_state_object(
    id: str,
    value: Dict[str, Any],
    type: StateType
)
id
str
required
The unique identifier for the state object
value
Dict[str, Any]
required
The state data to save/merge
type
StateType
required
The type of state (StateType.WORKFLOWS or StateType.CREDENTIALS)
return
Dict[str, Any]
The complete updated state after merge
raises
Exception
If there’s an error with the object store operations
This method merges the provided value with existing state and saves the complete object.

StateType

application_sdk.services.statestore.StateType Enumeration of state types.
from application_sdk.services.statestore import StateType

StateType.WORKFLOWS     # Workflow state
StateType.CREDENTIALS   # Credential state

Methods

is_member

Check if a string value is a valid StateType member.
StateType.is_member(type: str) -> bool
type
str
required
The string value to check
return
bool
True if the value is a valid StateType, False otherwise

Utility Functions

build_state_store_path

application_sdk.services.statestore.build_state_store_path Build the state file path for the given id and type.
from application_sdk.services.statestore import build_state_store_path, StateType

path = build_state_store_path(
    id: str,
    state_type: StateType
)
id
str
required
The unique identifier for the state
state_type
StateType
required
The type of state (WORKFLOWS or CREDENTIALS)
return
str
The constructed state file path
raises
PathTraversalError
If the id contains path traversal sequences that would escape the base directory

Example Usage

Storing Workflow State

from application_sdk.services.statestore import StateStore, StateType

# Save complete workflow configuration
workflow_config = {
    "source": "database",
    "database": "analytics",
    "schema": "public",
    "tables": ["users", "orders"],
    "metadata": {
        "batch_size": 1000,
        "parallel_tasks": 5
    }
}

await StateStore.save_state_object(
    id="workflow-abc-123",
    value=workflow_config,
    type=StateType.WORKFLOWS
)

# Retrieve workflow state
state = await StateStore.get_state(
    id="workflow-abc-123",
    type=StateType.WORKFLOWS
)

print(f"Processing {state['database']}.{state['schema']}")

Updating Workflow Progress

from application_sdk.services.statestore import StateStore, StateType

# Update progress
await StateStore.save_state(
    key="progress",
    value=50,
    id="workflow-abc-123",
    type=StateType.WORKFLOWS
)

# Update status
await StateStore.save_state(
    key="status",
    value="processing",
    id="workflow-abc-123",
    type=StateType.WORKFLOWS
)

# Update metadata
await StateStore.save_state(
    key="current_table",
    value="users",
    id="workflow-abc-123",
    type=StateType.WORKFLOWS
)

Storing Credentials

from application_sdk.services.statestore import StateStore, StateType

# Save credential configuration
credential_config = {
    "credential_type": "database",
    "host": "db.example.com",
    "port": 5432,
    "database": "analytics",
    "username": "app_user",
    "password_ref": "db-password-secret",  # Reference to secret
    "ssl_enabled": True
}

await StateStore.save_state_object(
    id="cred-database-123",
    value=credential_config,
    type=StateType.CREDENTIALS
)

# Retrieve credentials
creds = await StateStore.get_state(
    id="cred-database-123",
    type=StateType.CREDENTIALS
)

Activity State Management

from application_sdk.activities import ActivitiesInterface
from application_sdk.services.statestore import StateStore, StateType
from temporalio import activity

class MyActivities(ActivitiesInterface):
    
    @activity.defn
    async def process_with_checkpoint(self, workflow_args):
        """Process data with checkpointing."""
        workflow_id = workflow_args["workflow_id"]
        items = workflow_args["items"]
        
        # Get checkpoint
        state = await StateStore.get_state(
            id=workflow_id,
            type=StateType.WORKFLOWS
        )
        last_processed = state.get("last_processed_index", -1)
        
        # Process items from checkpoint
        for i, item in enumerate(items):
            if i <= last_processed:
                continue  # Skip already processed
            
            # Process item
            await self._process_item(item)
            
            # Update checkpoint every 100 items
            if (i + 1) % 100 == 0:
                await StateStore.save_state(
                    key="last_processed_index",
                    value=i,
                    id=workflow_id,
                    type=StateType.WORKFLOWS
                )
        
        # Mark complete
        await StateStore.save_state(
            key="status",
            value="completed",
            id=workflow_id,
            type=StateType.WORKFLOWS
        )

Merging State Updates

from application_sdk.services.statestore import StateStore, StateType

# Initial state
initial_state = {
    "workflow_id": "wf-123",
    "status": "started",
    "created_at": "2024-01-01T00:00:00Z"
}

await StateStore.save_state_object(
    id="wf-123",
    value=initial_state,
    type=StateType.WORKFLOWS
)

# Merge additional data
update = {
    "status": "running",
    "progress": 25,
    "current_step": "data_extraction"
}

updated = await StateStore.save_state_object(
    id="wf-123",
    value=update,
    type=StateType.WORKFLOWS
)

# updated now contains:
# {
#     "workflow_id": "wf-123",
#     "status": "running",           # Updated
#     "created_at": "2024-01-01T00:00:00Z",
#     "progress": 25,                # Added
#     "current_step": "data_extraction"  # Added
# }

State Path Management

from application_sdk.services.statestore import (
    build_state_store_path,
    StateType,
    PathTraversalError
)

# Build valid path
path = build_state_store_path(
    id="workflow-123",
    state_type=StateType.WORKFLOWS
)
print(path)
# Output: ./local/tmp/persistent-artifacts/apps/appName/workflows/workflow-123/config.json

# Path traversal protection
try:
    malicious_path = build_state_store_path(
        id="../../../etc/passwd",
        state_type=StateType.WORKFLOWS
    )
except PathTraversalError as e:
    print(f"Blocked malicious path: {e}")

Best Practices

State Management

  • Use descriptive state IDs
  • Store only necessary data in state
  • Use save_state for incremental updates
  • Use save_state_object for bulk updates
  • Clean up state after workflow completion

Workflow State

  • Store workflow configuration at start
  • Update progress regularly
  • Save checkpoints for long-running processes
  • Include timestamps for debugging
  • Store error information on failures

Credential State

  • Never store raw passwords in state
  • Use secret references instead
  • Include credential type and metadata
  • Validate credentials before storage

Error Handling

  • Always handle state retrieval errors
  • Provide default values for missing state
  • Log state operation failures
  • Implement retry logic for transient failures

Performance

  • Minimize state updates in hot paths
  • Batch updates when possible
  • Use save_state for single key updates
  • Consider state size limits

Build docs developers (and LLMs) love