The ObjectStore service provides a unified interface for object storage operations, supporting both file and directory operations.
ObjectStore
application_sdk.services.objectstore.ObjectStore
Unified object store interface supporting both file and directory operations.
Path Normalization
as_store_key
Normalize a local or object-store path into a clean object store key.
from application_sdk.services.objectstore import ObjectStore
key = ObjectStore.as_store_key(path: str)
The path to normalize. Accepts:
- Local SDK temporary paths (e.g.,
./local/tmp/artifacts/...)
- Absolute paths (e.g.,
/data/test.parquet)
- Already-relative object store keys (e.g.,
artifacts/...)
A normalized object store key with forward slashes and no leading/trailing slash, or empty string for empty input
File Operations
upload_file
Upload a single file to the object store.
await ObjectStore.upload_file(
source: str,
destination: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME,
retain_local_copy: bool = False
)
Local path to the file to upload
Object store key where the file will be stored
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
If True, keeps the local file after upload
If the source file cannot be read
If there’s an error uploading to the object store
upload_file_from_bytes
Upload file content directly from bytes to object store.
await ObjectStore.upload_file_from_bytes(
file_content: bytes,
destination: str,
store_name: str = UPSTREAM_OBJECT_STORE_NAME
)
Object store key where the file will be stored
store_name
str
default:"UPSTREAM_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
download_file
Download a single file from the object store.
await ObjectStore.download_file(
source: str,
destination: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
)
Object store key of the file to download
Local path where the file will be saved
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
The destination directory will be created automatically if it doesn’t exist.
get_content
Get raw file content from the object store.
content = await ObjectStore.get_content(
key: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME,
suppress_error: bool = False
)
The path of the file in the object store
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
Whether to suppress the error and return None if the file does not exist
The raw file content as bytes, or None if suppress_error is True and file doesn’t exist
delete_file
Delete a single file from the object store.
await ObjectStore.delete_file(
key: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
)
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
exists
Check if a file exists in the object store.
file_exists = await ObjectStore.exists(
key: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
)
The path of the file in the object store
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
True if the file exists, False otherwise
Directory Operations
upload_prefix
Upload all files from a directory to the object store.
await ObjectStore.upload_prefix(
source: str,
destination: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME,
recursive: bool = True,
retain_local_copy: bool = False
)
Local directory path containing files to upload
Object store prefix where files will be stored
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
Whether to include subdirectories
If True, keeps local files after upload
If the source path is not a valid directory
download_prefix
Download all files from a store prefix to a local directory.
await ObjectStore.download_prefix(
source: str,
destination: str = TEMPORARY_PATH,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
)
Object store prefix to download files from
destination
str
default:"TEMPORARY_PATH"
Local directory where files will be saved
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
list_files
List all files in the object store under a given prefix.
files = await ObjectStore.list_files(
prefix: str = "",
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
)
The prefix to filter files. Empty string returns all files.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
List of file paths in the object store
delete_prefix
Delete all files under a prefix from the object store.
await ObjectStore.delete_prefix(
prefix: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
)
The prefix path to delete all files under
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use
If no files are found under the prefix
Example Usage
Uploading Files
from application_sdk.services.objectstore import ObjectStore
# Upload single file
await ObjectStore.upload_file(
source="/local/data/report.pdf",
destination="reports/2024/january/report.pdf"
)
# Upload from bytes
file_content = b"Hello, World!"
await ObjectStore.upload_file_from_bytes(
file_content=file_content,
destination="documents/greeting.txt"
)
# Upload with retention
await ObjectStore.upload_file(
source="/local/data/backup.zip",
destination="backups/backup.zip",
retain_local_copy=True # Keep local file
)
Downloading Files
from application_sdk.services.objectstore import ObjectStore
# Download single file
await ObjectStore.download_file(
source="reports/2024/january/report.pdf",
destination="/local/downloads/report.pdf"
)
# Get file content directly
content = await ObjectStore.get_content(
key="documents/config.json"
)
import json
config = json.loads(content)
# Check if file exists first
if await ObjectStore.exists("data/file.csv"):
content = await ObjectStore.get_content("data/file.csv")
Working with Directories
from application_sdk.services.objectstore import ObjectStore
# Upload entire directory
await ObjectStore.upload_prefix(
source="/local/project/",
destination="backups/project-v1/",
recursive=True
)
# Upload only root level files
await ObjectStore.upload_prefix(
source="/local/logs/",
destination="daily-logs/",
recursive=False
)
# Download directory
await ObjectStore.download_prefix(
source="backups/project-v1/",
destination="/local/restored/"
)
# List files in directory
files = await ObjectStore.list_files(
prefix="reports/2024/"
)
for file in files:
print(f"Found: {file}")
Deleting Files
from application_sdk.services.objectstore import ObjectStore
# Delete single file
await ObjectStore.delete_file(
key="temp/processing_data.csv"
)
# Delete all files in directory
await ObjectStore.delete_prefix(
prefix="temp/batch_123/"
)
# Safe deletion with error handling
try:
await ObjectStore.delete_prefix(
prefix="old_data/2023/"
)
except FileNotFoundError:
logger.info("No files found to delete")
Path Normalization
from application_sdk.services.objectstore import ObjectStore
# Normalize various path formats
key1 = ObjectStore.as_store_key("./local/tmp/artifacts/data.json")
print(key1) # "artifacts/data.json"
key2 = ObjectStore.as_store_key("/absolute/path/file.csv")
print(key2) # "absolute/path/file.csv"
key3 = ObjectStore.as_store_key("relative/path/file.txt")
print(key3) # "relative/path/file.txt"
Activity Integration
from application_sdk.activities import ActivitiesInterface
from application_sdk.services.objectstore import ObjectStore
from temporalio import activity
class DataActivities(ActivitiesInterface):
@activity.defn
async def save_results(self, workflow_args):
"""Save workflow results to object store."""
workflow_id = workflow_args["workflow_id"]
output_path = workflow_args["output_path"]
results = workflow_args["results"]
# Save results to local file
local_path = f"/tmp/{workflow_id}/results.json"
with open(local_path, "w") as f:
json.dump(results, f)
# Upload to object store
store_key = f"{output_path}/results.json"
await ObjectStore.upload_file(
source=local_path,
destination=store_key
)
return {"output_file": store_key}
@activity.defn
async def load_input(self, workflow_args):
"""Load input data from object store."""
input_key = workflow_args["input_file"]
# Download file
content = await ObjectStore.get_content(input_key)
data = json.loads(content)
return {"input_data": data}
Batch Operations
from application_sdk.services.objectstore import ObjectStore
import asyncio
async def upload_batch(files: list):
"""Upload multiple files in parallel."""
tasks = [
ObjectStore.upload_file(
source=file["local_path"],
destination=file["store_key"]
)
for file in files
]
await asyncio.gather(*tasks)
async def download_batch(keys: list):
"""Download multiple files in parallel."""
tasks = [
ObjectStore.download_file(
source=key,
destination=f"/local/downloads/{key.split('/')[-1]}"
)
for key in keys
]
await asyncio.gather(*tasks)
Best Practices
File Operations
- Use descriptive object store keys
- Organize files with logical prefixes
- Clean up temporary files after upload (default behavior)
- Use retain_local_copy=True for important files
Directory Operations
- Use trailing slashes for directory prefixes
- Set recursive=True for complete directory trees
- List files before bulk operations
- Handle FileNotFoundError for delete operations
- Upload/download multiple files in parallel
- Use upload_prefix for entire directories
- Stream large files when possible
- Monitor DAPR message size limits
Error Handling
- Always handle upload/download errors
- Use suppress_error for optional files
- Check file existence before operations
- Implement retry logic for transient failures