Skip to main content
The AtlanStorage service handles data migration between local object storage and Atlan’s upstream storage system.

AtlanStorage

application_sdk.services.atlan_storage.AtlanStorage Handles upload operations to Atlan storage and migration from objectstore.

Class Methods

migrate_from_objectstore_to_atlan

Migrate all files from object store to Atlan storage under a given prefix.
from application_sdk.services.atlan_storage import AtlanStorage

summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
    prefix: str = ""
)
prefix
str
default:"\"\""
The prefix to filter which files to migrate. Empty string migrates all files.
return
MigrationSummary
Comprehensive migration summary including:
  • total_files: Number of files found for migration
  • migrated_files: Number successfully migrated
  • failed_migrations: Number that failed to migrate
  • failures: List of failure details with file paths and errors
  • prefix: The prefix used for filtering
  • source/destination: Storage system identifiers
raises
Exception
If there’s a critical error during the migration process
This method performs parallel migration of files for better performance.

MigrationSummary

application_sdk.services.atlan_storage.MigrationSummary Summary of migration operation from objectstore to Atlan storage.

Attributes

total_files
int
default:"0"
Total number of files found for migration
migrated_files
int
default:"0"
Number of files successfully migrated
failed_migrations
int
default:"0"
Number of files that failed to migrate
failures
List[Dict[str, str]]
default:"[]"
List of failure details with file paths and error messages
prefix
str
default:"\"\""
The prefix used to filter files for migration
source
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Source storage system (e.g., “objectstore”)
destination
str
default:"UPSTREAM_OBJECT_STORE_NAME"
Destination storage system (e.g., “upstream-objectstore”)

Example Usage

Basic Migration

from application_sdk.services.atlan_storage import AtlanStorage

# Migrate all files
summary = await AtlanStorage.migrate_from_objectstore_to_atlan()

print(f"Total files: {summary.total_files}")
print(f"Migrated: {summary.migrated_files}")
print(f"Failed: {summary.failed_migrations}")

if summary.failed_migrations > 0:
    print("\nFailures:")
    for failure in summary.failures:
        print(f"  {failure['file']}: {failure['error']}")

Prefix-Based Migration

from application_sdk.services.atlan_storage import AtlanStorage

# Migrate specific dataset
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
    prefix="processed_data/2024/"
)

if summary.total_files == 0:
    print("No files found with the specified prefix")
elif summary.failed_migrations == 0:
    print(f"Successfully migrated all {summary.total_files} files")
else:
    print(f"Migration completed with {summary.failed_migrations} failures")
    # Handle failures...

Activity Integration

from application_sdk.activities import ActivitiesInterface
from application_sdk.services.atlan_storage import AtlanStorage
from temporalio import activity

class MigrationActivities(ActivitiesInterface):
    
    @activity.defn
    async def migrate_workflow_outputs(self, workflow_args):
        """Migrate workflow outputs to Atlan storage."""
        output_prefix = workflow_args["output_path"]
        
        # Perform migration
        summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
            prefix=output_prefix
        )
        
        # Check results
        if summary.failed_migrations > 0:
            # Log failures
            activity.logger.warning(
                f"Migration completed with {summary.failed_migrations} failures",
                failures=summary.failures
            )
        
        return {
            "total_files": summary.total_files,
            "migrated_files": summary.migrated_files,
            "failed_migrations": summary.failed_migrations
        }

Error Handling

from application_sdk.services.atlan_storage import AtlanStorage
from application_sdk.observability.logger_adaptor import get_logger

logger = get_logger(__name__)

async def safe_migration(prefix: str) -> dict:
    """Migrate files with error handling."""
    try:
        summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
            prefix=prefix
        )
        
        # Check for failures
        if summary.failed_migrations > 0:
            logger.warning(
                f"Migration had {summary.failed_migrations} failures",
                failures=summary.failures
            )
        
        success_rate = (
            summary.migrated_files / summary.total_files * 100
            if summary.total_files > 0
            else 0
        )
        
        return {
            "success": True,
            "success_rate": success_rate,
            "summary": summary
        }
        
    except Exception as e:
        logger.error(f"Migration failed: {e}", exc_info=True)
        return {
            "success": False,
            "error": str(e)
        }

Workflow Integration

from application_sdk.workflows import WorkflowInterface
from application_sdk.services.atlan_storage import AtlanStorage
from temporalio import workflow
from datetime import timedelta

class DataProcessingWorkflow(WorkflowInterface):
    
    async def run(self, workflow_config):
        # ... workflow execution ...
        
        # Process data and save to object store
        result = await workflow.execute_activity_method(
            self.activities_cls.process_and_save,
            workflow_args,
            start_to_close_timeout=timedelta(hours=1)
        )
        
        # Migrate results to Atlan storage
        output_path = workflow_args["output_path"]
        
        # Note: Migration is CPU/IO bound, use activity
        migration_result = await workflow.execute_activity_method(
            self.activities_cls.migrate_to_atlan,
            {"prefix": output_path},
            start_to_close_timeout=timedelta(hours=2)
        )
        
        return {
            "processing": result,
            "migration": migration_result
        }

Batch Migration

from application_sdk.services.atlan_storage import AtlanStorage
import asyncio

async def migrate_multiple_prefixes(prefixes: list) -> list:
    """Migrate multiple prefixes in parallel."""
    tasks = [
        AtlanStorage.migrate_from_objectstore_to_atlan(prefix=prefix)
        for prefix in prefixes
    ]
    
    summaries = await asyncio.gather(*tasks, return_exceptions=True)
    
    results = []
    for prefix, summary in zip(prefixes, summaries):
        if isinstance(summary, Exception):
            results.append({
                "prefix": prefix,
                "success": False,
                "error": str(summary)
            })
        else:
            results.append({
                "prefix": prefix,
                "success": summary.failed_migrations == 0,
                "total_files": summary.total_files,
                "migrated_files": summary.migrated_files
            })
    
    return results

Migration Verification

from application_sdk.services.atlan_storage import AtlanStorage
from application_sdk.services.objectstore import ObjectStore
from application_sdk.constants import DEPLOYMENT_OBJECT_STORE_NAME

async def verify_migration(prefix: str) -> dict:
    """Verify migration was successful."""
    
    # Get source file list
    source_files = await ObjectStore.list_files(
        prefix=prefix,
        store_name=DEPLOYMENT_OBJECT_STORE_NAME
    )
    
    # Perform migration
    summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
        prefix=prefix
    )
    
    # Verify counts match
    if len(source_files) != summary.total_files:
        return {
            "verified": False,
            "error": "Source file count mismatch"
        }
    
    if summary.failed_migrations > 0:
        return {
            "verified": False,
            "error": f"{summary.failed_migrations} files failed to migrate",
            "failures": summary.failures
        }
    
    return {
        "verified": True,
        "files_migrated": summary.migrated_files
    }

Reporting

from application_sdk.services.atlan_storage import AtlanStorage

async def generate_migration_report(prefix: str) -> str:
    """Generate detailed migration report."""
    summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
        prefix=prefix
    )
    
    report = f"""Migration Report
================
Prefix: {summary.prefix}
Source: {summary.source}
Destination: {summary.destination}

Results:
--------
Total Files: {summary.total_files}
Migrated: {summary.migrated_files}
Failed: {summary.failed_migrations}
Success Rate: {(summary.migrated_files / summary.total_files * 100) if summary.total_files > 0 else 0:.1f}%
"""
    
    if summary.failed_migrations > 0:
        report += "\nFailures:\n---------\n"
        for failure in summary.failures:
            report += f"  - {failure['file']}: {failure['error']}\n"
    
    return report

Best Practices

Migration Planning

  • Test migrations with small prefixes first
  • Verify source data before migration
  • Monitor migration progress
  • Plan for retry of failed files

Performance

  • Migrations are performed in parallel for efficiency
  • Consider chunking very large migrations
  • Monitor resource usage during migration
  • Schedule large migrations during off-peak hours

Error Handling

  • Always check migration summary for failures
  • Log failure details for debugging
  • Implement retry logic for failed files
  • Alert on high failure rates

Verification

  • Verify file counts after migration
  • Spot-check migrated files
  • Keep source files until verification complete
  • Document migration results

Build docs developers (and LLMs) love