Skip to main content
This guide demonstrates how to process large collections of photos efficiently, with strategies for memory management, error recovery, and progress tracking.

Basic Batch Processing

Process large collections in manageable batches:
import asyncio
from pathlib import Path
from imagen_sdk import quick_edit, EditOptions, RAW_EXTENSIONS, JPG_EXTENSIONS

async def process_large_collection():
    """Process a large photo collection in batches."""
    
    # Discover all supported files
    raw_photos = []
    for ext in RAW_EXTENSIONS:
        raw_photos.extend(list(Path("photos").glob(f"*{ext}")))
    
    jpeg_photos = []
    for ext in JPG_EXTENSIONS:
        jpeg_photos.extend(list(Path("photos").glob(f"*{ext}")))
    
    # Process each file type separately (cannot mix in same project)
    for file_type, photos in [("RAW", raw_photos), ("JPEG", jpeg_photos)]:
        if not photos:
            continue
        
        print(f"Processing {len(photos)} {file_type} files...")
        batch_size = 50
        
        for i in range(0, len(photos), batch_size):
            batch = photos[i:i + batch_size]
            batch_num = i // batch_size + 1
            print(f"Processing {file_type} batch {batch_num}...")
            
            edit_options = EditOptions(crop=True, straighten=True)
            result = await quick_edit(
                api_key="your_key",
                profile_key=5700,
                image_paths=[str(p) for p in batch],
                edit_options=edit_options,
                download=True,
                download_dir=f"edited_{file_type.lower()}_batch_{batch_num}"
            )
            
            print(f"Batch {batch_num} complete: {len(result.downloaded_files)} photos")

asyncio.run(process_large_collection())

Advanced Batch Processing

For more control over large collections, use ImagenClient:
import asyncio
from pathlib import Path
from typing import List
from imagen_sdk import (
    ImagenClient,
    EditOptions,
    PhotographyType,
    RAW_EXTENSIONS,
    UploadError,
    ProjectError
)

async def advanced_batch_processing():
    """Process large collections with advanced error handling and progress tracking."""
    
    # Configuration
    API_KEY = "your_api_key"
    PROFILE_KEY = 5700
    BATCH_SIZE = 100
    MAX_CONCURRENT_UPLOADS = 3
    MAX_CONCURRENT_DOWNLOADS = 5
    
    # Find all RAW photos
    all_photos = []
    for ext in RAW_EXTENSIONS:
        all_photos.extend(list(Path("photos").glob(f"**/*{ext}")))
    
    print(f"Found {len(all_photos)} photos to process")
    
    # Split into batches
    batches = [
        all_photos[i:i + BATCH_SIZE]
        for i in range(0, len(all_photos), BATCH_SIZE)
    ]
    
    print(f"Split into {len(batches)} batches of {BATCH_SIZE} photos each")
    
    # Process each batch
    async with ImagenClient(API_KEY) as client:
        for batch_num, batch in enumerate(batches, 1):
            print(f"\n{'='*60}")
            print(f"Processing batch {batch_num}/{len(batches)}")
            print(f"{'='*60}")
            
            try:
                # Create project for this batch
                project_name = f"Batch_{batch_num}_{len(batch)}_photos"
                project_uuid = await client.create_project(project_name)
                print(f"Created project: {project_uuid}")
                
                # Upload with progress tracking
                def upload_progress(current, total, filename):
                    percent = (current / total) * 100
                    print(f"  Upload: {percent:.1f}% ({current}/{total})", end="\r")
                
                upload_result = await client.upload_images(
                    project_uuid,
                    [str(p) for p in batch],
                    max_concurrent=MAX_CONCURRENT_UPLOADS,
                    calculate_md5=True,  # Verify integrity
                    progress_callback=upload_progress
                )
                
                print(f"\n  Uploaded: {upload_result.successful}/{upload_result.total}")
                
                # Report any upload failures
                if upload_result.failed > 0:
                    print(f"  ⚠️  {upload_result.failed} files failed:")
                    for result in upload_result.results:
                        if not result.success:
                            print(f"    - {Path(result.file).name}: {result.error}")
                
                # Continue if at least some files uploaded
                if upload_result.successful == 0:
                    print(f"  ❌ No files uploaded in batch {batch_num}, skipping")
                    continue
                
                # Start editing
                print("  Starting editing...")
                edit_options = EditOptions(
                    crop=True,
                    straighten=True,
                    smooth_skin=True
                )
                
                await client.start_editing(
                    project_uuid,
                    profile_key=PROFILE_KEY,
                    photography_type=PhotographyType.PORTRAITS,
                    edit_options=edit_options
                )
                print("  ✅ Editing complete")
                
                # Download edited files
                print("  Downloading edited files...")
                
                def download_progress(current, total, message):
                    percent = (current / total) * 100
                    print(f"  Download: {percent:.1f}% ({current}/{total})", end="\r")
                
                download_links = await client.get_download_links(project_uuid)
                downloaded_files = await client.download_files(
                    download_links,
                    output_dir=f"edited/batch_{batch_num}",
                    max_concurrent=MAX_CONCURRENT_DOWNLOADS,
                    progress_callback=download_progress
                )
                
                print(f"\n  ✅ Batch {batch_num} complete: {len(downloaded_files)} files")
            
            except UploadError as e:
                print(f"  ❌ Upload error in batch {batch_num}: {e}")
                continue
            except ProjectError as e:
                print(f"  ❌ Project error in batch {batch_num}: {e}")
                continue
            except Exception as e:
                print(f"  ❌ Unexpected error in batch {batch_num}: {e}")
                continue
    
    print(f"\n{'='*60}")
    print("🎉 All batches processed!")
    print(f"{'='*60}")

asyncio.run(advanced_batch_processing())

Processing by Directory Structure

Organize batch processing based on your folder structure:
import asyncio
from pathlib import Path
from imagen_sdk import ImagenClient, EditOptions, RAW_EXTENSIONS

async def process_by_directory():
    """Process photos organized in directories, one project per directory."""
    
    root_dir = Path("photo_sessions")
    
    # Find all subdirectories
    session_dirs = [d for d in root_dir.iterdir() if d.is_dir()]
    
    print(f"Found {len(session_dirs)} session directories")
    
    async with ImagenClient("your_api_key") as client:
        for session_dir in session_dirs:
            # Find all RAW photos in this directory
            photos = []
            for ext in RAW_EXTENSIONS:
                photos.extend(list(session_dir.glob(f"*{ext}")))
            
            if not photos:
                print(f"⏭️  Skipping {session_dir.name} (no photos)")
                continue
            
            print(f"\n📸 Processing {session_dir.name}: {len(photos)} photos")
            
            # Create project named after directory
            project_uuid = await client.create_project(session_dir.name)
            
            # Upload and edit
            upload_result = await client.upload_images(
                project_uuid,
                [str(p) for p in photos]
            )
            
            print(f"  Uploaded: {upload_result.successful}/{upload_result.total}")
            
            if upload_result.successful > 0:
                edit_options = EditOptions(crop=True, straighten=True)
                await client.start_editing(
                    project_uuid,
                    profile_key=5700,
                    edit_options=edit_options
                )
                
                # Download to matching output directory
                download_links = await client.get_download_links(project_uuid)
                await client.download_files(
                    download_links,
                    output_dir=f"edited/{session_dir.name}"
                )
                
                print(f"  ✅ {session_dir.name} complete")

asyncio.run(process_by_directory())

Parallel Batch Processing

Process multiple batches in parallel (use with caution):
import asyncio
from pathlib import Path
from typing import List
from imagen_sdk import ImagenClient, EditOptions, RAW_EXTENSIONS

async def process_batch(client: ImagenClient, batch_num: int, photos: List[Path], profile_key: int):
    """Process a single batch of photos."""
    try:
        project_name = f"Batch_{batch_num}"
        project_uuid = await client.create_project(project_name)
        
        upload_result = await client.upload_images(
            project_uuid,
            [str(p) for p in photos],
            max_concurrent=2  # Lower concurrency when running parallel batches
        )
        
        if upload_result.successful > 0:
            edit_options = EditOptions(crop=True, straighten=True)
            await client.start_editing(
                project_uuid,
                profile_key=profile_key,
                edit_options=edit_options
            )
            
            download_links = await client.get_download_links(project_uuid)
            await client.download_files(
                download_links,
                output_dir=f"edited/batch_{batch_num}",
                max_concurrent=3
            )
            
            print(f"✅ Batch {batch_num} complete: {upload_result.successful} photos")
            return True
        else:
            print(f"❌ Batch {batch_num} failed: no files uploaded")
            return False
    
    except Exception as e:
        print(f"❌ Batch {batch_num} error: {e}")
        return False

async def parallel_batch_processing():
    """Process multiple batches in parallel."""
    
    # Find all photos
    all_photos = []
    for ext in RAW_EXTENSIONS:
        all_photos.extend(list(Path("photos").glob(f"*{ext}")))
    
    # Split into batches
    BATCH_SIZE = 50
    batches = [
        all_photos[i:i + BATCH_SIZE]
        for i in range(0, len(all_photos), BATCH_SIZE)
    ]
    
    print(f"Processing {len(batches)} batches in parallel")
    
    async with ImagenClient("your_api_key") as client:
        # Process batches in parallel (limit concurrency to avoid overwhelming the API)
        MAX_PARALLEL_BATCHES = 3
        
        for i in range(0, len(batches), MAX_PARALLEL_BATCHES):
            batch_group = batches[i:i + MAX_PARALLEL_BATCHES]
            
            tasks = [
                process_batch(client, i + j + 1, batch, 5700)
                for j, batch in enumerate(batch_group)
            ]
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            successful = sum(1 for r in results if r is True)
            print(f"Completed {successful}/{len(batch_group)} batches in this group")
    
    print("🎉 All batches processed!")

asyncio.run(parallel_batch_processing())
Processing multiple batches in parallel can overwhelm the API and your network connection. Start with sequential processing and only use parallel processing if needed.

Performance Optimization Tips

Optimal Batch Sizes

# Recommended batch sizes based on file types and sizes
BATCH_SIZES = {
    "small_raw": 100,    # RAW files < 30MB
    "large_raw": 50,     # RAW files > 30MB
    "jpeg": 200,         # JPEG files
}

# Adjust based on your files
def get_batch_size(photos: List[Path]) -> int:
    if not photos:
        return 50
    
    avg_size = sum(p.stat().st_size for p in photos[:10]) / 10
    avg_size_mb = avg_size / (1024 * 1024)
    
    if avg_size_mb < 30:
        return 100
    else:
        return 50

Concurrent Operations

# Adjust concurrency based on:
# - Network speed
# - File sizes
# - Whether processing batches in parallel

# Conservative (stable, slower)
upload_result = await client.upload_images(
    project_uuid,
    image_paths,
    max_concurrent=2
)

# Balanced (recommended)
upload_result = await client.upload_images(
    project_uuid,
    image_paths,
    max_concurrent=3
)

# Aggressive (faster, more network load)
upload_result = await client.upload_images(
    project_uuid,
    image_paths,
    max_concurrent=5
)

Progress Reporting

Provide detailed progress for large batches:
import time
from datetime import timedelta

class ProgressTracker:
    def __init__(self, total_files: int):
        self.total_files = total_files
        self.start_time = time.time()
        self.files_processed = 0
    
    def upload_progress(self, current: int, total: int, filename: str):
        percent = (current / total) * 100
        elapsed = time.time() - self.start_time
        
        if current > 0:
            eta = (elapsed / current) * (total - current)
            eta_str = str(timedelta(seconds=int(eta)))
        else:
            eta_str = "calculating..."
        
        print(f"Upload: {percent:.1f}% ({current}/{total}) | ETA: {eta_str}", end="\r")
    
    def download_progress(self, current: int, total: int, message: str):
        percent = (current / total) * 100
        print(f"Download: {percent:.1f}% ({current}/{total})", end="\r")

# Usage
tracker = ProgressTracker(len(all_photos))

await client.upload_images(
    project_uuid,
    image_paths,
    progress_callback=tracker.upload_progress
)

Error Recovery

Implement robust error handling for large batches:
import asyncio
import json
from pathlib import Path
from imagen_sdk import ImagenClient, UploadError, ProjectError, DownloadError

class BatchProcessor:
    def __init__(self, api_key: str, profile_key: int):
        self.api_key = api_key
        self.profile_key = profile_key
        self.failed_batches = []
        self.completed_batches = []
    
    async def process_batch_with_retry(self, client: ImagenClient, batch_num: int, photos: List[Path], max_retries: int = 3):
        """Process a batch with retry logic."""
        for attempt in range(max_retries):
            try:
                project_name = f"Batch_{batch_num}_attempt_{attempt + 1}"
                project_uuid = await client.create_project(project_name)
                
                upload_result = await client.upload_images(
                    project_uuid,
                    [str(p) for p in photos]
                )
                
                if upload_result.successful == 0:
                    raise UploadError("No files uploaded successfully")
                
                edit_options = EditOptions(crop=True, straighten=True)
                await client.start_editing(
                    project_uuid,
                    profile_key=self.profile_key,
                    edit_options=edit_options
                )
                
                download_links = await client.get_download_links(project_uuid)
                await client.download_files(
                    download_links,
                    output_dir=f"edited/batch_{batch_num}"
                )
                
                self.completed_batches.append(batch_num)
                print(f"✅ Batch {batch_num} complete")
                return True
            
            except (UploadError, ProjectError, DownloadError) as e:
                print(f"⚠️  Batch {batch_num} attempt {attempt + 1} failed: {e}")
                if attempt < max_retries - 1:
                    print(f"  Retrying in 5 seconds...")
                    await asyncio.sleep(5)
                else:
                    print(f"  ❌ Batch {batch_num} failed after {max_retries} attempts")
                    self.failed_batches.append({
                        "batch_num": batch_num,
                        "photos": [str(p) for p in photos],
                        "error": str(e)
                    })
                    return False
    
    def save_failed_batches(self, filepath: str = "failed_batches.json"):
        """Save failed batches to JSON for retry."""
        with open(filepath, "w") as f:
            json.dump(self.failed_batches, f, indent=2)
        print(f"Saved {len(self.failed_batches)} failed batches to {filepath}")

# Usage
async def robust_batch_processing():
    processor = BatchProcessor("your_api_key", 5700)
    
    # Process all batches
    async with ImagenClient(processor.api_key) as client:
        for batch_num, batch in enumerate(batches, 1):
            await processor.process_batch_with_retry(client, batch_num, batch)
    
    # Save failed batches for later retry
    if processor.failed_batches:
        processor.save_failed_batches()
        print(f"\n⚠️  {len(processor.failed_batches)} batches failed")
    
    print(f"\n{len(processor.completed_batches)} batches completed successfully")

Memory Management

For very large collections, manage memory carefully:
import gc
from pathlib import Path
from imagen_sdk import ImagenClient, EditOptions

async def memory_efficient_processing():
    """Process large collections with memory management."""
    
    async with ImagenClient("your_api_key") as client:
        # Process one batch at a time, clearing memory between batches
        batch_size = 50
        photo_dir = Path("photos")
        
        for i, batch_start in enumerate(range(0, 1000, batch_size)):  # 1000 total photos
            # Load only files for this batch
            batch_files = list(photo_dir.glob("*.cr2"))[batch_start:batch_start + batch_size]
            
            if not batch_files:
                break
            
            print(f"Processing batch {i + 1}")
            
            # Process batch
            project_uuid = await client.create_project(f"Batch_{i + 1}")
            await client.upload_images(project_uuid, [str(p) for p in batch_files])
            
            edit_options = EditOptions(crop=True, straighten=True)
            await client.start_editing(
                project_uuid,
                profile_key=5700,
                edit_options=edit_options
            )
            
            download_links = await client.get_download_links(project_uuid)
            await client.download_files(download_links, output_dir=f"edited/batch_{i + 1}")
            
            # Clear memory
            del batch_files
            del download_links
            gc.collect()
            
            print(f"✅ Batch {i + 1} complete and memory cleared")

Next Steps

Error Handling

Comprehensive error handling patterns

Advanced Workflow

Step-by-step control with ImagenClient

Wedding Photography

Specialized workflow examples

API Reference

Complete ImagenClient documentation

Build docs developers (and LLMs) love