The AtlanStorage service handles data migration between local object storage and Atlan’s upstream storage system.
AtlanStorage
application_sdk.services.atlan_storage.AtlanStorage
Handles upload operations to Atlan storage and migration from objectstore.
Class Methods
migrate_from_objectstore_to_atlan
Migrate all files from object store to Atlan storage under a given prefix.
from application_sdk.services.atlan_storage import AtlanStorage
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix: str = ""
)
The prefix to filter which files to migrate. Empty string migrates all files.
Comprehensive migration summary including:
total_files: Number of files found for migration
migrated_files: Number successfully migrated
failed_migrations: Number that failed to migrate
failures: List of failure details with file paths and errors
prefix: The prefix used for filtering
source/destination: Storage system identifiers
If there’s a critical error during the migration process
This method performs parallel migration of files for better performance.
MigrationSummary
application_sdk.services.atlan_storage.MigrationSummary
Summary of migration operation from objectstore to Atlan storage.
Attributes
Total number of files found for migration
Number of files successfully migrated
Number of files that failed to migrate
failures
List[Dict[str, str]]
default:"[]"
List of failure details with file paths and error messages
The prefix used to filter files for migration
source
str
default:"DEPLOYMENT_OBJECT_STORE_NAME"
Source storage system (e.g., “objectstore”)
destination
str
default:"UPSTREAM_OBJECT_STORE_NAME"
Destination storage system (e.g., “upstream-objectstore”)
Example Usage
Basic Migration
from application_sdk.services.atlan_storage import AtlanStorage
# Migrate all files
summary = await AtlanStorage.migrate_from_objectstore_to_atlan()
print(f"Total files: {summary.total_files}")
print(f"Migrated: {summary.migrated_files}")
print(f"Failed: {summary.failed_migrations}")
if summary.failed_migrations > 0:
print("\nFailures:")
for failure in summary.failures:
print(f" {failure['file']}: {failure['error']}")
Prefix-Based Migration
from application_sdk.services.atlan_storage import AtlanStorage
# Migrate specific dataset
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix="processed_data/2024/"
)
if summary.total_files == 0:
print("No files found with the specified prefix")
elif summary.failed_migrations == 0:
print(f"Successfully migrated all {summary.total_files} files")
else:
print(f"Migration completed with {summary.failed_migrations} failures")
# Handle failures...
Activity Integration
from application_sdk.activities import ActivitiesInterface
from application_sdk.services.atlan_storage import AtlanStorage
from temporalio import activity
class MigrationActivities(ActivitiesInterface):
@activity.defn
async def migrate_workflow_outputs(self, workflow_args):
"""Migrate workflow outputs to Atlan storage."""
output_prefix = workflow_args["output_path"]
# Perform migration
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix=output_prefix
)
# Check results
if summary.failed_migrations > 0:
# Log failures
activity.logger.warning(
f"Migration completed with {summary.failed_migrations} failures",
failures=summary.failures
)
return {
"total_files": summary.total_files,
"migrated_files": summary.migrated_files,
"failed_migrations": summary.failed_migrations
}
Error Handling
from application_sdk.services.atlan_storage import AtlanStorage
from application_sdk.observability.logger_adaptor import get_logger
logger = get_logger(__name__)
async def safe_migration(prefix: str) -> dict:
"""Migrate files with error handling."""
try:
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix=prefix
)
# Check for failures
if summary.failed_migrations > 0:
logger.warning(
f"Migration had {summary.failed_migrations} failures",
failures=summary.failures
)
success_rate = (
summary.migrated_files / summary.total_files * 100
if summary.total_files > 0
else 0
)
return {
"success": True,
"success_rate": success_rate,
"summary": summary
}
except Exception as e:
logger.error(f"Migration failed: {e}", exc_info=True)
return {
"success": False,
"error": str(e)
}
Workflow Integration
from application_sdk.workflows import WorkflowInterface
from application_sdk.services.atlan_storage import AtlanStorage
from temporalio import workflow
from datetime import timedelta
class DataProcessingWorkflow(WorkflowInterface):
async def run(self, workflow_config):
# ... workflow execution ...
# Process data and save to object store
result = await workflow.execute_activity_method(
self.activities_cls.process_and_save,
workflow_args,
start_to_close_timeout=timedelta(hours=1)
)
# Migrate results to Atlan storage
output_path = workflow_args["output_path"]
# Note: Migration is CPU/IO bound, use activity
migration_result = await workflow.execute_activity_method(
self.activities_cls.migrate_to_atlan,
{"prefix": output_path},
start_to_close_timeout=timedelta(hours=2)
)
return {
"processing": result,
"migration": migration_result
}
Batch Migration
from application_sdk.services.atlan_storage import AtlanStorage
import asyncio
async def migrate_multiple_prefixes(prefixes: list) -> list:
"""Migrate multiple prefixes in parallel."""
tasks = [
AtlanStorage.migrate_from_objectstore_to_atlan(prefix=prefix)
for prefix in prefixes
]
summaries = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for prefix, summary in zip(prefixes, summaries):
if isinstance(summary, Exception):
results.append({
"prefix": prefix,
"success": False,
"error": str(summary)
})
else:
results.append({
"prefix": prefix,
"success": summary.failed_migrations == 0,
"total_files": summary.total_files,
"migrated_files": summary.migrated_files
})
return results
Migration Verification
from application_sdk.services.atlan_storage import AtlanStorage
from application_sdk.services.objectstore import ObjectStore
from application_sdk.constants import DEPLOYMENT_OBJECT_STORE_NAME
async def verify_migration(prefix: str) -> dict:
"""Verify migration was successful."""
# Get source file list
source_files = await ObjectStore.list_files(
prefix=prefix,
store_name=DEPLOYMENT_OBJECT_STORE_NAME
)
# Perform migration
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix=prefix
)
# Verify counts match
if len(source_files) != summary.total_files:
return {
"verified": False,
"error": "Source file count mismatch"
}
if summary.failed_migrations > 0:
return {
"verified": False,
"error": f"{summary.failed_migrations} files failed to migrate",
"failures": summary.failures
}
return {
"verified": True,
"files_migrated": summary.migrated_files
}
Reporting
from application_sdk.services.atlan_storage import AtlanStorage
async def generate_migration_report(prefix: str) -> str:
"""Generate detailed migration report."""
summary = await AtlanStorage.migrate_from_objectstore_to_atlan(
prefix=prefix
)
report = f"""Migration Report
================
Prefix: {summary.prefix}
Source: {summary.source}
Destination: {summary.destination}
Results:
--------
Total Files: {summary.total_files}
Migrated: {summary.migrated_files}
Failed: {summary.failed_migrations}
Success Rate: {(summary.migrated_files / summary.total_files * 100) if summary.total_files > 0 else 0:.1f}%
"""
if summary.failed_migrations > 0:
report += "\nFailures:\n---------\n"
for failure in summary.failures:
report += f" - {failure['file']}: {failure['error']}\n"
return report
Best Practices
Migration Planning
- Test migrations with small prefixes first
- Verify source data before migration
- Monitor migration progress
- Plan for retry of failed files
- Migrations are performed in parallel for efficiency
- Consider chunking very large migrations
- Monitor resource usage during migration
- Schedule large migrations during off-peak hours
Error Handling
- Always check migration summary for failures
- Log failure details for debugging
- Implement retry logic for failed files
- Alert on high failure rates
Verification
- Verify file counts after migration
- Spot-check migrated files
- Keep source files until verification complete
- Document migration results