The AtlanStorage service provides utilities for migrating data between local object storage and Atlan’s upstream storage system. It’s specifically designed for the bucket cloning strategy used in customer-deployed applications.
Overview
AtlanStorage supports:
- Parallel file migration: Concurrent transfer of multiple files for performance
- Comprehensive error handling: Detailed failure tracking and reporting
- Progress monitoring: Migration summary with success/failure counts
- Prefix-based filtering: Selective migration of specific datasets
Class Reference
AtlanStorage
Handles upload operations to Atlan storage and migration from objectstore.All methods are class methods and can be called directly without instantiation.
Core Methods
migrate_from_objectstore_to_atlan
Migrate all files from object store to Atlan storage under a given prefix.
await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix: str = ""
) -> MigrationSummary
This method performs a parallel migration of files from the local object store to Atlan’s upstream storage system. It provides comprehensive error handling and detailed reporting of the migration process.
The prefix to filter which files to migrate. Empty string migrates all files.
Comprehensive migration summary including:
total_files: Number of files found for migration
migrated_files: Number successfully migrated
failed_migrations: Number that failed to migrate
failures: List of failure details with file paths and errors
prefix: The prefix used for filtering
source: Source storage system identifier (default: DEPLOYMENT_OBJECT_STORE_NAME)
destination: Destination storage system identifier (default: UPSTREAM_OBJECT_STORE_NAME)
Raises:
Exception: If there’s a critical error during the migration process
Example:
from application_sdk.services import AtlanStorage
# Migrate all files
summary = await AtlanStorage.migrate_from_objectstore_to_atlan()
print(f"Success rate: {summary.migrated_files/summary.total_files*100:.1f}%")
# Migrate specific dataset
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix="processed_data/2024/"
)
if summary.total_files == 0:
print("No files found with the specified prefix")
elif summary.failed_migrations == 0:
print(f"Successfully migrated all {summary.total_files} files")
else:
print(f"Migration completed with {summary.failed_migrations} failures")
for failure in summary.failures:
print(f" Failed: {failure['file']} - {failure['error']}")
Models
MigrationSummary
Summary of migration operation from objectstore to Atlan storage.
class MigrationSummary(BaseModel):
total_files: int = 0
migrated_files: int = 0
failed_migrations: int = 0
failures: List[Dict[str, str]] = []
prefix: str = ""
source: str = DEPLOYMENT_OBJECT_STORE_NAME
destination: str = UPSTREAM_OBJECT_STORE_NAME
Total number of files found for migration.
Number of files successfully migrated.
Number of files that failed to migrate.
failures
List[Dict[str, str]]
default:"[]"
List of failure details with file paths and error messages.
Each entry has structure: {"file": "path/to/file", "error": "error message"}
The prefix used to filter files for migration.
source
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Source storage system (e.g., “objectstore”).
destination
str
default:"UPSTREAM_OBJECT_STORE_NAME"
Destination storage system (e.g., “upstream-objectstore”).
Usage Patterns
Workflow Completion Migration
Migrate workflow results to Atlan storage after completion:
from application_sdk.services import AtlanStorage
from temporalio import workflow
@workflow.defn
class DataProcessingWorkflow:
@workflow.run
async def run(self, workflow_id: str):
# Process data and store in local object store
output_prefix = f"results/{workflow_id}/"
await workflow.execute_activity(
process_and_store_data,
output_prefix,
start_to_close_timeout=timedelta(hours=2)
)
# Migrate results to Atlan storage
migration_summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix=output_prefix
)
if migration_summary.failed_migrations > 0:
logger.warning(
f"Migration completed with {migration_summary.failed_migrations} failures"
)
# Handle partial failure...
else:
logger.info(
f"Successfully migrated {migration_summary.migrated_files} files"
)
return migration_summary
Batch Dataset Migration
Migrate processed datasets in batches:
from application_sdk.services import AtlanStorage
from temporalio import activity
@activity.defn
async def migrate_processed_datasets(date_prefix: str) -> dict:
"""
Migrate all processed datasets for a specific date.
"""
# Define dataset prefixes to migrate
dataset_types = ["customer_data", "transaction_data", "analytics_data"]
migration_results = {}
for dataset_type in dataset_types:
prefix = f"processed/{date_prefix}/{dataset_type}/"
logger.info(f"Migrating {dataset_type} for {date_prefix}")
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix=prefix
)
migration_results[dataset_type] = {
"total": summary.total_files,
"migrated": summary.migrated_files,
"failed": summary.failed_migrations
}
if summary.failed_migrations > 0:
logger.error(
f"Failed to migrate {summary.failed_migrations} files from {dataset_type}"
)
for failure in summary.failures:
logger.error(f" {failure['file']}: {failure['error']}")
return migration_results
Selective Migration with Validation
Migrate files with pre-migration validation:
from application_sdk.services import AtlanStorage, ObjectStore
from temporalio import activity
@activity.defn
async def validate_and_migrate(prefix: str) -> MigrationSummary:
"""
Validate files before migration and report issues.
"""
# List files to be migrated
files = await ObjectStore.list_files(prefix=prefix)
logger.info(f"Found {len(files)} files for migration under prefix: {prefix}")
# Pre-migration validation
validation_errors = []
for file_path in files:
try:
# Check if file is accessible
content = await ObjectStore.get_content(file_path, suppress_error=True)
if content is None:
validation_errors.append(f"Cannot read file: {file_path}")
except Exception as e:
validation_errors.append(f"Validation error for {file_path}: {str(e)}")
if validation_errors:
logger.warning(f"Found {len(validation_errors)} validation errors")
for error in validation_errors:
logger.warning(f" {error}")
# Proceed with migration
logger.info("Starting migration to Atlan storage")
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(prefix=prefix)
# Log detailed results
logger.info(
f"Migration complete: {summary.migrated_files}/{summary.total_files} succeeded"
)
if summary.failed_migrations > 0:
logger.error("Migration failures:")
for failure in summary.failures:
logger.error(f" {failure['file']}: {failure['error']}")
return summary
Incremental Migration
Migrate data incrementally based on workflow progress:
from application_sdk.services import AtlanStorage, StateStore, StateType
from temporalio import workflow
@workflow.defn
class IncrementalDataPipeline:
@workflow.run
async def run(self, pipeline_id: str):
batches = ["batch_001", "batch_002", "batch_003"]
for batch_id in batches:
# Process batch
batch_prefix = f"processing/{pipeline_id}/{batch_id}/"
await workflow.execute_activity(
process_batch,
batch_prefix,
start_to_close_timeout=timedelta(minutes=30)
)
# Migrate batch results immediately
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix=batch_prefix
)
# Track migration in state
await StateStore.save_state(
key=f"migration_{batch_id}",
value={
"total_files": summary.total_files,
"migrated_files": summary.migrated_files,
"failed_migrations": summary.failed_migrations
},
id=pipeline_id,
type=StateType.WORKFLOWS
)
# Handle failures
if summary.failed_migrations > 0:
logger.warning(
f"Batch {batch_id} migration had {summary.failed_migrations} failures"
)
# Optionally retry or alert
return "Pipeline completed"
Error Handling
Handling Migration Failures
Gracefully handle migration failures:
from application_sdk.services import AtlanStorage
@activity.defn
async def safe_migration(prefix: str, max_failure_rate: float = 0.05):
"""
Migrate files with failure rate checking.
"""
try:
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(prefix=prefix)
if summary.total_files == 0:
logger.warning(f"No files found for prefix: {prefix}")
return {"status": "no_files", "summary": summary}
# Calculate failure rate
failure_rate = summary.failed_migrations / summary.total_files
if failure_rate > max_failure_rate:
error_msg = (
f"Migration failure rate {failure_rate:.1%} exceeds "
f"threshold {max_failure_rate:.1%}"
)
logger.error(error_msg)
# Log all failures
for failure in summary.failures:
logger.error(f"Failed file: {failure['file']} - {failure['error']}")
return {
"status": "failure_threshold_exceeded",
"summary": summary,
"error": error_msg
}
return {"status": "success", "summary": summary}
except Exception as e:
logger.error(f"Migration failed critically: {str(e)}")
raise
Best Practices
Parallel Performance: The migration runs file transfers in parallel using asyncio.gather(). For large datasets, this provides significant performance improvements over sequential transfers.
Empty Prefix: Calling migrate_from_objectstore_to_atlan() with an empty prefix will migrate ALL files in the object store. Use specific prefixes to control scope.
Error Handling: Migration continues even if individual files fail. Always check summary.failed_migrations and handle failures appropriately.
Progress Tracking: For long-running migrations, log the migration summary periodically or store it in StateStore for monitoring.
Migration Process
The migration process follows these steps:
List Files
Query the deployment object store for all files matching the prefix.
Create Tasks
Create parallel async tasks for each file to be migrated.
Execute Migrations
Run all migrations concurrently using asyncio.gather():
- Fetch file content from deployment store
- Upload to upstream Atlan storage
- Track success/failure for each file
Generate Summary
Compile results into MigrationSummary with:
- Total file count
- Success and failure counts
- Detailed failure information
Configuration
AtlanStorage uses these storage backends:
from application_sdk.constants import (
DEPLOYMENT_OBJECT_STORE_NAME, # Default: "objectstore" (source)
UPSTREAM_OBJECT_STORE_NAME, # Default: "upstream-objectstore" (destination)
)
- ObjectStore - AtlanStorage uses ObjectStore for file listing and content retrieval
- StateStore - Often used together to track migration progress