The StateStore service provides persistent storage for workflow state and credentials.
StateStore
application_sdk.services.statestore.StateStore
Unified state store service for handling state management.
Class Methods
get_state
Get state from the store.
from application_sdk.services.statestore import StateStore, StateType
state = await StateStore.get_state(
id: str,
type: StateType
)
The unique identifier to retrieve the state for
The type of state to retrieve (StateType.WORKFLOWS or StateType.CREDENTIALS)
The retrieved state data. Returns empty dict if no state found.
If there’s an error with the object store operations
save_state
Save a single state value to the store.
await StateStore.save_state(
key: str,
value: Any,
id: str,
type: StateType
)
The key to store the state value under
The value to store (can be any JSON-serializable type)
The unique identifier for the state object
The type of state (StateType.WORKFLOWS or StateType.CREDENTIALS)
If there’s an error with the object store operations
This method updates a specific key within the state object, merging with existing state.
save_state_object
Save the entire state object to the object store.
updated_state = await StateStore.save_state_object(
id: str,
value: Dict[str, Any],
type: StateType
)
The unique identifier for the state object
The state data to save/merge
The type of state (StateType.WORKFLOWS or StateType.CREDENTIALS)
The complete updated state after merge
If there’s an error with the object store operations
This method merges the provided value with existing state and saves the complete object.
StateType
application_sdk.services.statestore.StateType
Enumeration of state types.
from application_sdk.services.statestore import StateType
StateType.WORKFLOWS # Workflow state
StateType.CREDENTIALS # Credential state
Methods
is_member
Check if a string value is a valid StateType member.
StateType.is_member(type: str) -> bool
The string value to check
True if the value is a valid StateType, False otherwise
Utility Functions
build_state_store_path
application_sdk.services.statestore.build_state_store_path
Build the state file path for the given id and type.
from application_sdk.services.statestore import build_state_store_path, StateType
path = build_state_store_path(
id: str,
state_type: StateType
)
The unique identifier for the state
The type of state (WORKFLOWS or CREDENTIALS)
The constructed state file path
If the id contains path traversal sequences that would escape the base directory
Example Usage
Storing Workflow State
from application_sdk.services.statestore import StateStore, StateType
# Save complete workflow configuration
workflow_config = {
"source": "database",
"database": "analytics",
"schema": "public",
"tables": ["users", "orders"],
"metadata": {
"batch_size": 1000,
"parallel_tasks": 5
}
}
await StateStore.save_state_object(
id="workflow-abc-123",
value=workflow_config,
type=StateType.WORKFLOWS
)
# Retrieve workflow state
state = await StateStore.get_state(
id="workflow-abc-123",
type=StateType.WORKFLOWS
)
print(f"Processing {state['database']}.{state['schema']}")
Updating Workflow Progress
from application_sdk.services.statestore import StateStore, StateType
# Update progress
await StateStore.save_state(
key="progress",
value=50,
id="workflow-abc-123",
type=StateType.WORKFLOWS
)
# Update status
await StateStore.save_state(
key="status",
value="processing",
id="workflow-abc-123",
type=StateType.WORKFLOWS
)
# Update metadata
await StateStore.save_state(
key="current_table",
value="users",
id="workflow-abc-123",
type=StateType.WORKFLOWS
)
Storing Credentials
from application_sdk.services.statestore import StateStore, StateType
# Save credential configuration
credential_config = {
"credential_type": "database",
"host": "db.example.com",
"port": 5432,
"database": "analytics",
"username": "app_user",
"password_ref": "db-password-secret", # Reference to secret
"ssl_enabled": True
}
await StateStore.save_state_object(
id="cred-database-123",
value=credential_config,
type=StateType.CREDENTIALS
)
# Retrieve credentials
creds = await StateStore.get_state(
id="cred-database-123",
type=StateType.CREDENTIALS
)
Activity State Management
from application_sdk.activities import ActivitiesInterface
from application_sdk.services.statestore import StateStore, StateType
from temporalio import activity
class MyActivities(ActivitiesInterface):
@activity.defn
async def process_with_checkpoint(self, workflow_args):
"""Process data with checkpointing."""
workflow_id = workflow_args["workflow_id"]
items = workflow_args["items"]
# Get checkpoint
state = await StateStore.get_state(
id=workflow_id,
type=StateType.WORKFLOWS
)
last_processed = state.get("last_processed_index", -1)
# Process items from checkpoint
for i, item in enumerate(items):
if i <= last_processed:
continue # Skip already processed
# Process item
await self._process_item(item)
# Update checkpoint every 100 items
if (i + 1) % 100 == 0:
await StateStore.save_state(
key="last_processed_index",
value=i,
id=workflow_id,
type=StateType.WORKFLOWS
)
# Mark complete
await StateStore.save_state(
key="status",
value="completed",
id=workflow_id,
type=StateType.WORKFLOWS
)
Merging State Updates
from application_sdk.services.statestore import StateStore, StateType
# Initial state
initial_state = {
"workflow_id": "wf-123",
"status": "started",
"created_at": "2024-01-01T00:00:00Z"
}
await StateStore.save_state_object(
id="wf-123",
value=initial_state,
type=StateType.WORKFLOWS
)
# Merge additional data
update = {
"status": "running",
"progress": 25,
"current_step": "data_extraction"
}
updated = await StateStore.save_state_object(
id="wf-123",
value=update,
type=StateType.WORKFLOWS
)
# updated now contains:
# {
# "workflow_id": "wf-123",
# "status": "running", # Updated
# "created_at": "2024-01-01T00:00:00Z",
# "progress": 25, # Added
# "current_step": "data_extraction" # Added
# }
State Path Management
from application_sdk.services.statestore import (
build_state_store_path,
StateType,
PathTraversalError
)
# Build valid path
path = build_state_store_path(
id="workflow-123",
state_type=StateType.WORKFLOWS
)
print(path)
# Output: ./local/tmp/persistent-artifacts/apps/appName/workflows/workflow-123/config.json
# Path traversal protection
try:
malicious_path = build_state_store_path(
id="../../../etc/passwd",
state_type=StateType.WORKFLOWS
)
except PathTraversalError as e:
print(f"Blocked malicious path: {e}")
Best Practices
State Management
- Use descriptive state IDs
- Store only necessary data in state
- Use save_state for incremental updates
- Use save_state_object for bulk updates
- Clean up state after workflow completion
Workflow State
- Store workflow configuration at start
- Update progress regularly
- Save checkpoints for long-running processes
- Include timestamps for debugging
- Store error information on failures
Credential State
- Never store raw passwords in state
- Use secret references instead
- Include credential type and metadata
- Validate credentials before storage
Error Handling
- Always handle state retrieval errors
- Provide default values for missing state
- Log state operation failures
- Implement retry logic for transient failures
- Minimize state updates in hot paths
- Batch updates when possible
- Use save_state for single key updates
- Consider state size limits