Skip to main content
The AtlanStorage service provides utilities for migrating data between local object storage and Atlan’s upstream storage system. It’s specifically designed for the bucket cloning strategy used in customer-deployed applications.

Overview

AtlanStorage supports:
  • Parallel file migration: Concurrent transfer of multiple files for performance
  • Comprehensive error handling: Detailed failure tracking and reporting
  • Progress monitoring: Migration summary with success/failure counts
  • Prefix-based filtering: Selective migration of specific datasets

Class Reference

AtlanStorage

AtlanStorage
class
Handles upload operations to Atlan storage and migration from objectstore.All methods are class methods and can be called directly without instantiation.

Core Methods

migrate_from_objectstore_to_atlan

Migrate all files from object store to Atlan storage under a given prefix.
await AtlanStorage.migrate_from_objectstore_to_atlan(
    prefix: str = ""
) -> MigrationSummary
This method performs a parallel migration of files from the local object store to Atlan’s upstream storage system. It provides comprehensive error handling and detailed reporting of the migration process.
prefix
str
default:""
The prefix to filter which files to migrate. Empty string migrates all files.
return
MigrationSummary
Comprehensive migration summary including:
  • total_files: Number of files found for migration
  • migrated_files: Number successfully migrated
  • failed_migrations: Number that failed to migrate
  • failures: List of failure details with file paths and errors
  • prefix: The prefix used for filtering
  • source: Source storage system identifier (default: DEPLOYMENT_OBJECT_STORE_NAME)
  • destination: Destination storage system identifier (default: UPSTREAM_OBJECT_STORE_NAME)
Raises:
  • Exception: If there’s a critical error during the migration process
Example:
from application_sdk.services import AtlanStorage

# Migrate all files
summary = await AtlanStorage.migrate_from_objectstore_to_atlan()
print(f"Success rate: {summary.migrated_files/summary.total_files*100:.1f}%")

# Migrate specific dataset
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
    prefix="processed_data/2024/"
)

if summary.total_files == 0:
    print("No files found with the specified prefix")
elif summary.failed_migrations == 0:
    print(f"Successfully migrated all {summary.total_files} files")
else:
    print(f"Migration completed with {summary.failed_migrations} failures")
    for failure in summary.failures:
        print(f"  Failed: {failure['file']} - {failure['error']}")

Models

MigrationSummary

Summary of migration operation from objectstore to Atlan storage.
class MigrationSummary(BaseModel):
    total_files: int = 0
    migrated_files: int = 0
    failed_migrations: int = 0
    failures: List[Dict[str, str]] = []
    prefix: str = ""
    source: str = DEPLOYMENT_OBJECT_STORE_NAME
    destination: str = UPSTREAM_OBJECT_STORE_NAME
total_files
int
default:"0"
Total number of files found for migration.
migrated_files
int
default:"0"
Number of files successfully migrated.
failed_migrations
int
default:"0"
Number of files that failed to migrate.
failures
List[Dict[str, str]]
default:"[]"
List of failure details with file paths and error messages. Each entry has structure: {"file": "path/to/file", "error": "error message"}
prefix
str
default:""
The prefix used to filter files for migration.
source
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Source storage system (e.g., “objectstore”).
destination
str
default:"UPSTREAM_OBJECT_STORE_NAME"
Destination storage system (e.g., “upstream-objectstore”).

Usage Patterns

Workflow Completion Migration

Migrate workflow results to Atlan storage after completion:
from application_sdk.services import AtlanStorage
from temporalio import workflow

@workflow.defn
class DataProcessingWorkflow:
    @workflow.run
    async def run(self, workflow_id: str):
        # Process data and store in local object store
        output_prefix = f"results/{workflow_id}/"
        
        await workflow.execute_activity(
            process_and_store_data,
            output_prefix,
            start_to_close_timeout=timedelta(hours=2)
        )
        
        # Migrate results to Atlan storage
        migration_summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
            prefix=output_prefix
        )
        
        if migration_summary.failed_migrations > 0:
            logger.warning(
                f"Migration completed with {migration_summary.failed_migrations} failures"
            )
            # Handle partial failure...
        else:
            logger.info(
                f"Successfully migrated {migration_summary.migrated_files} files"
            )
        
        return migration_summary

Batch Dataset Migration

Migrate processed datasets in batches:
from application_sdk.services import AtlanStorage
from temporalio import activity

@activity.defn
async def migrate_processed_datasets(date_prefix: str) -> dict:
    """
    Migrate all processed datasets for a specific date.
    """
    # Define dataset prefixes to migrate
    dataset_types = ["customer_data", "transaction_data", "analytics_data"]
    
    migration_results = {}
    
    for dataset_type in dataset_types:
        prefix = f"processed/{date_prefix}/{dataset_type}/"
        
        logger.info(f"Migrating {dataset_type} for {date_prefix}")
        
        summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
            prefix=prefix
        )
        
        migration_results[dataset_type] = {
            "total": summary.total_files,
            "migrated": summary.migrated_files,
            "failed": summary.failed_migrations
        }
        
        if summary.failed_migrations > 0:
            logger.error(
                f"Failed to migrate {summary.failed_migrations} files from {dataset_type}"
            )
            for failure in summary.failures:
                logger.error(f"  {failure['file']}: {failure['error']}")
    
    return migration_results

Selective Migration with Validation

Migrate files with pre-migration validation:
from application_sdk.services import AtlanStorage, ObjectStore
from temporalio import activity

@activity.defn
async def validate_and_migrate(prefix: str) -> MigrationSummary:
    """
    Validate files before migration and report issues.
    """
    # List files to be migrated
    files = await ObjectStore.list_files(prefix=prefix)
    
    logger.info(f"Found {len(files)} files for migration under prefix: {prefix}")
    
    # Pre-migration validation
    validation_errors = []
    for file_path in files:
        try:
            # Check if file is accessible
            content = await ObjectStore.get_content(file_path, suppress_error=True)
            if content is None:
                validation_errors.append(f"Cannot read file: {file_path}")
        except Exception as e:
            validation_errors.append(f"Validation error for {file_path}: {str(e)}")
    
    if validation_errors:
        logger.warning(f"Found {len(validation_errors)} validation errors")
        for error in validation_errors:
            logger.warning(f"  {error}")
    
    # Proceed with migration
    logger.info("Starting migration to Atlan storage")
    summary = await AtlanStorage.migrate_from_objectstore_to_atlan(prefix=prefix)
    
    # Log detailed results
    logger.info(
        f"Migration complete: {summary.migrated_files}/{summary.total_files} succeeded"
    )
    
    if summary.failed_migrations > 0:
        logger.error("Migration failures:")
        for failure in summary.failures:
            logger.error(f"  {failure['file']}: {failure['error']}")
    
    return summary

Incremental Migration

Migrate data incrementally based on workflow progress:
from application_sdk.services import AtlanStorage, StateStore, StateType
from temporalio import workflow

@workflow.defn
class IncrementalDataPipeline:
    @workflow.run
    async def run(self, pipeline_id: str):
        batches = ["batch_001", "batch_002", "batch_003"]
        
        for batch_id in batches:
            # Process batch
            batch_prefix = f"processing/{pipeline_id}/{batch_id}/"
            
            await workflow.execute_activity(
                process_batch,
                batch_prefix,
                start_to_close_timeout=timedelta(minutes=30)
            )
            
            # Migrate batch results immediately
            summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
                prefix=batch_prefix
            )
            
            # Track migration in state
            await StateStore.save_state(
                key=f"migration_{batch_id}",
                value={
                    "total_files": summary.total_files,
                    "migrated_files": summary.migrated_files,
                    "failed_migrations": summary.failed_migrations
                },
                id=pipeline_id,
                type=StateType.WORKFLOWS
            )
            
            # Handle failures
            if summary.failed_migrations > 0:
                logger.warning(
                    f"Batch {batch_id} migration had {summary.failed_migrations} failures"
                )
                # Optionally retry or alert
        
        return "Pipeline completed"

Error Handling

Handling Migration Failures

Gracefully handle migration failures:
from application_sdk.services import AtlanStorage

@activity.defn
async def safe_migration(prefix: str, max_failure_rate: float = 0.05):
    """
    Migrate files with failure rate checking.
    """
    try:
        summary = await AtlanStorage.migrate_from_objectstore_to_atlan(prefix=prefix)
        
        if summary.total_files == 0:
            logger.warning(f"No files found for prefix: {prefix}")
            return {"status": "no_files", "summary": summary}
        
        # Calculate failure rate
        failure_rate = summary.failed_migrations / summary.total_files
        
        if failure_rate > max_failure_rate:
            error_msg = (
                f"Migration failure rate {failure_rate:.1%} exceeds "
                f"threshold {max_failure_rate:.1%}"
            )
            logger.error(error_msg)
            
            # Log all failures
            for failure in summary.failures:
                logger.error(f"Failed file: {failure['file']} - {failure['error']}")
            
            return {
                "status": "failure_threshold_exceeded",
                "summary": summary,
                "error": error_msg
            }
        
        return {"status": "success", "summary": summary}
        
    except Exception as e:
        logger.error(f"Migration failed critically: {str(e)}")
        raise

Best Practices

Parallel Performance: The migration runs file transfers in parallel using asyncio.gather(). For large datasets, this provides significant performance improvements over sequential transfers.
Empty Prefix: Calling migrate_from_objectstore_to_atlan() with an empty prefix will migrate ALL files in the object store. Use specific prefixes to control scope.
Error Handling: Migration continues even if individual files fail. Always check summary.failed_migrations and handle failures appropriately.
Progress Tracking: For long-running migrations, log the migration summary periodically or store it in StateStore for monitoring.

Migration Process

The migration process follows these steps:
1

List Files

Query the deployment object store for all files matching the prefix.
2

Create Tasks

Create parallel async tasks for each file to be migrated.
3

Execute Migrations

Run all migrations concurrently using asyncio.gather():
  • Fetch file content from deployment store
  • Upload to upstream Atlan storage
  • Track success/failure for each file
4

Generate Summary

Compile results into MigrationSummary with:
  • Total file count
  • Success and failure counts
  • Detailed failure information

Configuration

AtlanStorage uses these storage backends:
from application_sdk.constants import (
    DEPLOYMENT_OBJECT_STORE_NAME,  # Default: "objectstore" (source)
    UPSTREAM_OBJECT_STORE_NAME,     # Default: "upstream-objectstore" (destination)
)
  • ObjectStore - AtlanStorage uses ObjectStore for file listing and content retrieval
  • StateStore - Often used together to track migration progress

Build docs developers (and LLMs) love