The StateStore service provides a unified interface for managing persistent state in your Atlan applications. It automatically synchronizes state data to object storage, ensuring durability and enabling state sharing across workflow runs.
Overview
StateStore is designed for storing:
- Workflow state: Progress tracking, intermediate results, configuration
- Credential configuration: Credential metadata and references to secrets
All state is stored as JSON in the object storage under organized paths with automatic path traversal protection.
Class Reference
StateStore
Unified state store service for handling state management.All methods are class methods and can be called directly without instantiation.
Methods
get_state
Retrieve state from the store.
await StateStore.get_state(id: str, type: StateType) -> Dict[str, Any]
The unique identifier to retrieve the state for.
The type of state to retrieve (StateType.WORKFLOWS or StateType.CREDENTIALS).
The retrieved state data. Returns empty dict if no state found.
Raises:
IOError: If there’s an error with the object store operations
Exception: If there’s an unexpected error during state retrieval
Example:
from application_sdk.services import StateStore, StateType
# Get workflow state
state = await StateStore.get_state("workflow-123", StateType.WORKFLOWS)
print(f"Current status: {state.get('status', 'unknown')}")
# Get credential configuration
creds = await StateStore.get_state("db-cred-456", StateType.CREDENTIALS)
print(f"Database: {creds.get('database')}")
save_state
Save a single state value to the store (merges with existing state).
await StateStore.save_state(
key: str,
value: Any,
id: str,
type: StateType
) -> None
The key to store the state value under.
The value to store (can be any JSON-serializable type).
The unique identifier for the state object.
The type of state (StateType.WORKFLOWS or StateType.CREDENTIALS).
Raises:
Exception: If there’s an error with the object store operations
Example:
from application_sdk.services import StateStore, StateType
# Update workflow progress
await StateStore.save_state(
key="progress",
value=75,
id="workflow-123",
type=StateType.WORKFLOWS
)
# Update workflow status with dict
await StateStore.save_state(
key="execution_info",
value={"started_at": "2024-01-15T10:00:00Z", "worker_id": "worker-1"},
id="workflow-123",
type=StateType.WORKFLOWS
)
save_state_object
Save an entire state object to the store (merges with existing state).
await StateStore.save_state_object(
id: str,
value: Dict[str, Any],
type: StateType
) -> Dict[str, Any]
The unique identifier for the state object.
The state data to save/merge.
The type of state (StateType.WORKFLOWS or StateType.CREDENTIALS).
The complete updated state after merge.
Raises:
Exception: If there’s an error with the object store operations
Example:
from application_sdk.services import StateStore, StateType
# Save complete workflow state
workflow_state = {
"status": "running",
"current_step": "data_processing",
"progress": 50,
"config": {"batch_size": 1000}
}
updated = await StateStore.save_state_object(
id="workflow-123",
value=workflow_state,
type=StateType.WORKFLOWS
)
print(f"Final state has {len(updated)} keys")
# Save credential configuration
cred_config = {
"credential_type": "database",
"host": "db.example.com",
"port": 5432
}
await StateStore.save_state_object(
id="db-cred-456",
value=cred_config,
type=StateType.CREDENTIALS
)
Helper Functions
build_state_store_path
Build the state file path for a given id and type with path traversal protection.
build_state_store_path(id: str, state_type: StateType) -> str
The unique identifier for the state.
The type of state (StateType.WORKFLOWS or StateType.CREDENTIALS).
The constructed state file path.
Raises:
PathTraversalError: If the id contains path traversal sequences that would escape the base directory
Example:
from application_sdk.services import build_state_store_path, StateType
# Workflow state path
path = build_state_store_path("workflow-123", StateType.WORKFLOWS)
print(path)
# Output: './local/tmp/persistent-artifacts/apps/appName/workflows/workflow-123/config.json'
# Credential state path
cred_path = build_state_store_path("db-cred-456", StateType.CREDENTIALS)
print(cred_path)
# Output: './local/tmp/persistent-artifacts/apps/appName/credentials/db-cred-456/config.json'
Enums and Exceptions
StateType
Enumeration of state types.
Workflow state type for storing workflow-related data.
Credentials state type for storing credential configurations.
Methods:
StateType.is_member(type: str) -> bool
Check if a string value is a valid StateType member.
Example:
from application_sdk.services import StateType
print(StateType.is_member("workflows")) # True
print(StateType.is_member("invalid")) # False
PathTraversalError
Exception raised when a path traversal attempt is detected.
class PathTraversalError(ValueError):
"""Raised when a path traversal attempt is detected."""
Example:
from application_sdk.services import build_state_store_path, StateType, PathTraversalError
try:
# This will raise PathTraversalError
path = build_state_store_path("../../../etc/passwd", StateType.WORKFLOWS)
except PathTraversalError as e:
print(f"Security error: {e}")
Usage Patterns
Workflow Progress Tracking
Track workflow execution progress across activities:
from application_sdk.services import StateStore, StateType
@activity.defn
async def process_batch(batch_id: str, batch_num: int, total_batches: int):
# Process batch...
# Update progress
progress = (batch_num / total_batches) * 100
await StateStore.save_state(
key="progress",
value=progress,
id=batch_id,
type=StateType.WORKFLOWS
)
# Update batch info
await StateStore.save_state(
key="current_batch",
value=batch_num,
id=batch_id,
type=StateType.WORKFLOWS
)
Credential Configuration Management
Store credential metadata for later resolution:
from application_sdk.services import StateStore, StateType
import uuid
async def store_credential_config(config: dict) -> str:
"""Store credential configuration and return its ID."""
cred_id = str(uuid.uuid4())
await StateStore.save_state_object(
id=cred_id,
value={
"credentialSource": "agent",
"secret-path": config["secret_path"],
"credential_type": config["type"],
"created_at": datetime.now().isoformat()
},
type=StateType.CREDENTIALS
)
return cred_id
State Initialization
Initialize state at workflow start:
from application_sdk.services import StateStore, StateType
from datetime import datetime
@workflow.defn
class DataProcessingWorkflow:
@workflow.run
async def run(self, workflow_id: str):
# Initialize workflow state
initial_state = {
"status": "started",
"started_at": datetime.now().isoformat(),
"progress": 0,
"total_records": 0,
"processed_records": 0
}
await StateStore.save_state_object(
id=workflow_id,
value=initial_state,
type=StateType.WORKFLOWS
)
# Execute workflow activities...
Best Practices
Path Traversal Protection: Always use the provided build_state_store_path function or StateStore methods. Never construct paths manually to avoid security vulnerabilities.
State Merging: Both save_state and save_state_object merge with existing state. To completely replace state, retrieve it first, modify, then save.
Empty State: get_state returns an empty dict {} when no state exists. Always check for key existence before accessing values.
Storage Location
State files are stored in object storage under:
./local/tmp/persistent-artifacts/apps/{APPLICATION_NAME}/{state_type}/{id}/config.json
Where:
{APPLICATION_NAME}: Configured application name
{state_type}: “workflows” or “credentials”
{id}: The unique identifier provided
- ObjectStore - StateStore uses ObjectStore for persistence
- SecretStore - SecretStore uses StateStore for credential configuration