The ObjectStore service provides a unified, cloud-agnostic interface for file and directory operations. It abstracts away the complexity of working with different cloud storage providers through Dapr bindings.
Overview
ObjectStore supports:
- Single file operations: Upload, download, get content, delete
- Bulk operations: Upload/download entire directories, list files by prefix, delete by prefix
- Path normalization: Automatic conversion between local paths and object store keys
- Multi-store support: Work with multiple storage backends simultaneously
Class Reference
ObjectStore
Unified object store interface supporting both file and directory operations.All methods are class methods and can be called directly without instantiation.
Single File Operations
upload_file
Upload a single file to the object store.
await ObjectStore.upload_file(
source: str,
destination: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME,
retain_local_copy: bool = False
) -> None
Local path to the file to upload.
Object store key where the file will be stored.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
Whether to keep the local file after upload. If False, the file is deleted after successful upload.
Raises:
IOError: If the source file cannot be read
Exception: If there’s an error uploading to the object store
Example:
from application_sdk.services import ObjectStore
# Upload a single file
await ObjectStore.upload_file(
source="/tmp/report.pdf",
destination="reports/2024/january/report.pdf"
)
# Upload and keep local copy
await ObjectStore.upload_file(
source="/tmp/data.csv",
destination="data/input.csv",
retain_local_copy=True
)
upload_file_from_bytes
Upload file content directly from bytes to object store.
await ObjectStore.upload_file_from_bytes(
file_content: bytes,
destination: str,
store_name: str = UPSTREAM_OBJECT_STORE_NAME
) -> None
Object store key where the file will be stored.
store_name
str
default:"UPSTREAM_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
Example:
from application_sdk.services import ObjectStore
import json
# Upload JSON data
data = {"status": "completed", "timestamp": "2024-01-15T10:00:00Z"}
json_bytes = json.dumps(data).encode("utf-8")
await ObjectStore.upload_file_from_bytes(
file_content=json_bytes,
destination="results/status.json"
)
download_file
Download a single file from the object store.
await ObjectStore.download_file(
source: str,
destination: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
) -> None
Object store key of the file to download.
Local path where the file will be saved.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
Note: The destination directory will be created automatically if it doesn’t exist.
Example:
from application_sdk.services import ObjectStore
# Download a file
await ObjectStore.download_file(
source="reports/2024/january/report.pdf",
destination="/tmp/downloaded_report.pdf"
)
get_content
Get raw file content from the object store without saving to disk.
await ObjectStore.get_content(
key: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME,
suppress_error: bool = False
) -> bytes | None
The path of the file in the object store.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
Whether to suppress the error and return None if the file does not exist.
The raw file content as bytes, or None if file not found and suppress_error is True.
Example:
from application_sdk.services import ObjectStore
import json
# Get JSON content
content = await ObjectStore.get_content("config/settings.json")
if content:
settings = json.loads(content.decode("utf-8"))
print(settings)
# Safely check for optional file
content = await ObjectStore.get_content(
"optional/metadata.json",
suppress_error=True
)
if content is None:
print("File not found, using defaults")
delete_file
Delete a single file from the object store.
await ObjectStore.delete_file(
key: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
) -> None
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
Example:
from application_sdk.services import ObjectStore
# Delete a file
await ObjectStore.delete_file("temp/processing_data.csv")
exists
Check if a file exists in the object store.
await ObjectStore.exists(
key: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
) -> bool
The path of the file in the object store.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
True if the file exists, False otherwise.
Example:
from application_sdk.services import ObjectStore
if await ObjectStore.exists("data/input.csv"):
print("Input file found, processing...")
else:
print("Input file missing, aborting")
Directory Operations
upload_prefix
Upload all files from a directory to the object store.
await ObjectStore.upload_prefix(
source: str,
destination: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME,
recursive: bool = True,
retain_local_copy: bool = False
) -> None
Local directory path containing files to upload.
Object store prefix where files will be stored.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
Whether to include subdirectories.
Whether to keep local files after upload.
Raises:
ValueError: If the source path is not a valid directory
Exception: If there’s an error during the upload process
Example:
from application_sdk.services import ObjectStore
# Upload all files recursively
await ObjectStore.upload_prefix(
source="local/project/",
destination="backups/project-v1/",
recursive=True
)
# Upload only root level files
await ObjectStore.upload_prefix(
source="local/logs/",
destination="daily-logs/",
recursive=False
)
download_prefix
Download all files from a store prefix to a local directory.
await ObjectStore.download_prefix(
source: str,
destination: str = TEMPORARY_PATH,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
) -> None
Object store prefix to download files from.
destination
str
default:"TEMPORARY_PATH"
Local directory where files will be saved.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
Example:
from application_sdk.services import ObjectStore
# Download entire dataset
await ObjectStore.download_prefix(
source="datasets/customer-data/",
destination="/tmp/downloads/"
)
list_files
List all files in the object store under a given prefix.
await ObjectStore.list_files(
prefix: str = "",
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
) -> List[str]
The prefix to filter files. Empty string returns all files.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
List of file paths in the object store.
Example:
from application_sdk.services import ObjectStore
# List all files in a directory
files = await ObjectStore.list_files(prefix="reports/2024/")
print(f"Found {len(files)} report files")
for file in files:
print(f" - {file}")
# List all files
all_files = await ObjectStore.list_files()
delete_prefix
Delete all files under a prefix from the object store.
await ObjectStore.delete_prefix(
prefix: str,
store_name: str = DEPLOYMENT_OBJECT_STORE_NAME
) -> None
The prefix path to delete all files under.
store_name
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Name of the Dapr object store binding to use.
Raises:
FileNotFoundError: If no files are found under the prefix
Exception: If there’s an error deleting files from the object store
Example:
from application_sdk.services import ObjectStore
# Clean up temporary files
try:
await ObjectStore.delete_prefix("temp/batch-123/")
print("Cleanup completed")
except FileNotFoundError:
print("No files to clean up")
Path Normalization
as_store_key
Normalize a local or object-store path into a clean object store key.
ObjectStore.as_store_key(path: str) -> str
A normalized object store key with forward slashes and no leading/trailing slash, or empty string for empty input.
Accepts:
- Local SDK temporary paths (e.g.,
./local/tmp/artifacts/...)
- Absolute paths (e.g.,
/data/test.parquet becomes data/test.parquet)
- Already-relative object store keys (e.g.,
artifacts/...)
Example:
from application_sdk.services import ObjectStore
# Normalize various path formats
key1 = ObjectStore.as_store_key("./local/tmp/data/file.csv")
print(key1) # "data/file.csv"
key2 = ObjectStore.as_store_key("/absolute/path/file.txt")
print(key2) # "absolute/path/file.txt"
key3 = ObjectStore.as_store_key("already/normalized/key")
print(key3) # "already/normalized/key"
Usage Patterns
Workflow Artifact Management
Store and retrieve workflow artifacts:
from application_sdk.services import ObjectStore
import json
@activity.defn
async def generate_report(workflow_id: str) -> str:
# Generate report data
report = {
"workflow_id": workflow_id,
"status": "completed",
"metrics": {"processed": 1000, "failed": 5}
}
# Save report to object store
report_path = f"reports/{workflow_id}/summary.json"
await ObjectStore.upload_file_from_bytes(
file_content=json.dumps(report, indent=2).encode("utf-8"),
destination=report_path
)
return report_path
@activity.defn
async def retrieve_report(report_path: str) -> dict:
# Retrieve report from object store
content = await ObjectStore.get_content(report_path)
return json.loads(content.decode("utf-8"))
Batch File Processing
Process multiple files from object storage:
from application_sdk.services import ObjectStore
@activity.defn
async def process_data_files(input_prefix: str, output_prefix: str):
# List all input files
files = await ObjectStore.list_files(prefix=input_prefix)
logger.info(f"Processing {len(files)} files")
for file_path in files:
# Download file
local_path = f"/tmp/{os.path.basename(file_path)}"
await ObjectStore.download_file(file_path, local_path)
# Process file...
processed_data = process_file(local_path)
# Upload results
output_path = f"{output_prefix}/{os.path.basename(file_path)}"
await ObjectStore.upload_file(
source=local_path,
destination=output_path
)
logger.info("All files processed")
Temporary File Cleanup
Clean up temporary files after workflow completion:
from application_sdk.services import ObjectStore
@workflow.defn
class DataPipelineWorkflow:
@workflow.run
async def run(self, workflow_id: str):
temp_prefix = f"temp/{workflow_id}/"
try:
# Process data using temp storage
await workflow.execute_activity(
process_with_temp_storage,
workflow_id,
start_to_close_timeout=timedelta(hours=1)
)
finally:
# Always cleanup temporary files
try:
await ObjectStore.delete_prefix(temp_prefix)
logger.info(f"Cleaned up temporary files: {temp_prefix}")
except FileNotFoundError:
logger.info("No temporary files to clean up")
Best Practices
Path Normalization: Use as_store_key() when working with paths from different sources to ensure consistent formatting.
Large Files: For files larger than DAPR_MAX_GRPC_MESSAGE_LENGTH (default 4MB), the SDK automatically adjusts gRPC message limits. Monitor logs for warnings about large file transfers.
Automatic Cleanup: By default, upload_file deletes the local file after successful upload. Set retain_local_copy=True to keep the original file.
Error Handling: Use suppress_error=True with get_content() when checking for optional files to avoid exception handling overhead.
Multi-Store Support
Work with different storage backends:
from application_sdk.constants import (
DEPLOYMENT_OBJECT_STORE_NAME, # Local deployment storage
UPSTREAM_OBJECT_STORE_NAME # Upstream Atlan storage
)
# Upload to local storage
await ObjectStore.upload_file(
source="/tmp/data.csv",
destination="local-data/input.csv",
store_name=DEPLOYMENT_OBJECT_STORE_NAME
)
# Upload to upstream storage
await ObjectStore.upload_file(
source="/tmp/results.json",
destination="results/output.json",
store_name=UPSTREAM_OBJECT_STORE_NAME
)