Basic Batch Processing
Process large collections in manageable batches:import asyncio
from pathlib import Path
from imagen_sdk import quick_edit, EditOptions, RAW_EXTENSIONS, JPG_EXTENSIONS
async def process_large_collection():
"""Process a large photo collection in batches."""
# Discover all supported files
raw_photos = []
for ext in RAW_EXTENSIONS:
raw_photos.extend(list(Path("photos").glob(f"*{ext}")))
jpeg_photos = []
for ext in JPG_EXTENSIONS:
jpeg_photos.extend(list(Path("photos").glob(f"*{ext}")))
# Process each file type separately (cannot mix in same project)
for file_type, photos in [("RAW", raw_photos), ("JPEG", jpeg_photos)]:
if not photos:
continue
print(f"Processing {len(photos)} {file_type} files...")
batch_size = 50
for i in range(0, len(photos), batch_size):
batch = photos[i:i + batch_size]
batch_num = i // batch_size + 1
print(f"Processing {file_type} batch {batch_num}...")
edit_options = EditOptions(crop=True, straighten=True)
result = await quick_edit(
api_key="your_key",
profile_key=5700,
image_paths=[str(p) for p in batch],
edit_options=edit_options,
download=True,
download_dir=f"edited_{file_type.lower()}_batch_{batch_num}"
)
print(f"Batch {batch_num} complete: {len(result.downloaded_files)} photos")
asyncio.run(process_large_collection())
Advanced Batch Processing
For more control over large collections, useImagenClient:
import asyncio
from pathlib import Path
from typing import List
from imagen_sdk import (
ImagenClient,
EditOptions,
PhotographyType,
RAW_EXTENSIONS,
UploadError,
ProjectError
)
async def advanced_batch_processing():
"""Process large collections with advanced error handling and progress tracking."""
# Configuration
API_KEY = "your_api_key"
PROFILE_KEY = 5700
BATCH_SIZE = 100
MAX_CONCURRENT_UPLOADS = 3
MAX_CONCURRENT_DOWNLOADS = 5
# Find all RAW photos
all_photos = []
for ext in RAW_EXTENSIONS:
all_photos.extend(list(Path("photos").glob(f"**/*{ext}")))
print(f"Found {len(all_photos)} photos to process")
# Split into batches
batches = [
all_photos[i:i + BATCH_SIZE]
for i in range(0, len(all_photos), BATCH_SIZE)
]
print(f"Split into {len(batches)} batches of {BATCH_SIZE} photos each")
# Process each batch
async with ImagenClient(API_KEY) as client:
for batch_num, batch in enumerate(batches, 1):
print(f"\n{'='*60}")
print(f"Processing batch {batch_num}/{len(batches)}")
print(f"{'='*60}")
try:
# Create project for this batch
project_name = f"Batch_{batch_num}_{len(batch)}_photos"
project_uuid = await client.create_project(project_name)
print(f"Created project: {project_uuid}")
# Upload with progress tracking
def upload_progress(current, total, filename):
percent = (current / total) * 100
print(f" Upload: {percent:.1f}% ({current}/{total})", end="\r")
upload_result = await client.upload_images(
project_uuid,
[str(p) for p in batch],
max_concurrent=MAX_CONCURRENT_UPLOADS,
calculate_md5=True, # Verify integrity
progress_callback=upload_progress
)
print(f"\n Uploaded: {upload_result.successful}/{upload_result.total}")
# Report any upload failures
if upload_result.failed > 0:
print(f" ⚠️ {upload_result.failed} files failed:")
for result in upload_result.results:
if not result.success:
print(f" - {Path(result.file).name}: {result.error}")
# Continue if at least some files uploaded
if upload_result.successful == 0:
print(f" ❌ No files uploaded in batch {batch_num}, skipping")
continue
# Start editing
print(" Starting editing...")
edit_options = EditOptions(
crop=True,
straighten=True,
smooth_skin=True
)
await client.start_editing(
project_uuid,
profile_key=PROFILE_KEY,
photography_type=PhotographyType.PORTRAITS,
edit_options=edit_options
)
print(" ✅ Editing complete")
# Download edited files
print(" Downloading edited files...")
def download_progress(current, total, message):
percent = (current / total) * 100
print(f" Download: {percent:.1f}% ({current}/{total})", end="\r")
download_links = await client.get_download_links(project_uuid)
downloaded_files = await client.download_files(
download_links,
output_dir=f"edited/batch_{batch_num}",
max_concurrent=MAX_CONCURRENT_DOWNLOADS,
progress_callback=download_progress
)
print(f"\n ✅ Batch {batch_num} complete: {len(downloaded_files)} files")
except UploadError as e:
print(f" ❌ Upload error in batch {batch_num}: {e}")
continue
except ProjectError as e:
print(f" ❌ Project error in batch {batch_num}: {e}")
continue
except Exception as e:
print(f" ❌ Unexpected error in batch {batch_num}: {e}")
continue
print(f"\n{'='*60}")
print("🎉 All batches processed!")
print(f"{'='*60}")
asyncio.run(advanced_batch_processing())
Processing by Directory Structure
Organize batch processing based on your folder structure:import asyncio
from pathlib import Path
from imagen_sdk import ImagenClient, EditOptions, RAW_EXTENSIONS
async def process_by_directory():
"""Process photos organized in directories, one project per directory."""
root_dir = Path("photo_sessions")
# Find all subdirectories
session_dirs = [d for d in root_dir.iterdir() if d.is_dir()]
print(f"Found {len(session_dirs)} session directories")
async with ImagenClient("your_api_key") as client:
for session_dir in session_dirs:
# Find all RAW photos in this directory
photos = []
for ext in RAW_EXTENSIONS:
photos.extend(list(session_dir.glob(f"*{ext}")))
if not photos:
print(f"⏭️ Skipping {session_dir.name} (no photos)")
continue
print(f"\n📸 Processing {session_dir.name}: {len(photos)} photos")
# Create project named after directory
project_uuid = await client.create_project(session_dir.name)
# Upload and edit
upload_result = await client.upload_images(
project_uuid,
[str(p) for p in photos]
)
print(f" Uploaded: {upload_result.successful}/{upload_result.total}")
if upload_result.successful > 0:
edit_options = EditOptions(crop=True, straighten=True)
await client.start_editing(
project_uuid,
profile_key=5700,
edit_options=edit_options
)
# Download to matching output directory
download_links = await client.get_download_links(project_uuid)
await client.download_files(
download_links,
output_dir=f"edited/{session_dir.name}"
)
print(f" ✅ {session_dir.name} complete")
asyncio.run(process_by_directory())
Parallel Batch Processing
Process multiple batches in parallel (use with caution):import asyncio
from pathlib import Path
from typing import List
from imagen_sdk import ImagenClient, EditOptions, RAW_EXTENSIONS
async def process_batch(client: ImagenClient, batch_num: int, photos: List[Path], profile_key: int):
"""Process a single batch of photos."""
try:
project_name = f"Batch_{batch_num}"
project_uuid = await client.create_project(project_name)
upload_result = await client.upload_images(
project_uuid,
[str(p) for p in photos],
max_concurrent=2 # Lower concurrency when running parallel batches
)
if upload_result.successful > 0:
edit_options = EditOptions(crop=True, straighten=True)
await client.start_editing(
project_uuid,
profile_key=profile_key,
edit_options=edit_options
)
download_links = await client.get_download_links(project_uuid)
await client.download_files(
download_links,
output_dir=f"edited/batch_{batch_num}",
max_concurrent=3
)
print(f"✅ Batch {batch_num} complete: {upload_result.successful} photos")
return True
else:
print(f"❌ Batch {batch_num} failed: no files uploaded")
return False
except Exception as e:
print(f"❌ Batch {batch_num} error: {e}")
return False
async def parallel_batch_processing():
"""Process multiple batches in parallel."""
# Find all photos
all_photos = []
for ext in RAW_EXTENSIONS:
all_photos.extend(list(Path("photos").glob(f"*{ext}")))
# Split into batches
BATCH_SIZE = 50
batches = [
all_photos[i:i + BATCH_SIZE]
for i in range(0, len(all_photos), BATCH_SIZE)
]
print(f"Processing {len(batches)} batches in parallel")
async with ImagenClient("your_api_key") as client:
# Process batches in parallel (limit concurrency to avoid overwhelming the API)
MAX_PARALLEL_BATCHES = 3
for i in range(0, len(batches), MAX_PARALLEL_BATCHES):
batch_group = batches[i:i + MAX_PARALLEL_BATCHES]
tasks = [
process_batch(client, i + j + 1, batch, 5700)
for j, batch in enumerate(batch_group)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if r is True)
print(f"Completed {successful}/{len(batch_group)} batches in this group")
print("🎉 All batches processed!")
asyncio.run(parallel_batch_processing())
Processing multiple batches in parallel can overwhelm the API and your network connection. Start with sequential processing and only use parallel processing if needed.
Performance Optimization Tips
Optimal Batch Sizes
# Recommended batch sizes based on file types and sizes
BATCH_SIZES = {
"small_raw": 100, # RAW files < 30MB
"large_raw": 50, # RAW files > 30MB
"jpeg": 200, # JPEG files
}
# Adjust based on your files
def get_batch_size(photos: List[Path]) -> int:
if not photos:
return 50
avg_size = sum(p.stat().st_size for p in photos[:10]) / 10
avg_size_mb = avg_size / (1024 * 1024)
if avg_size_mb < 30:
return 100
else:
return 50
Concurrent Operations
# Adjust concurrency based on:
# - Network speed
# - File sizes
# - Whether processing batches in parallel
# Conservative (stable, slower)
upload_result = await client.upload_images(
project_uuid,
image_paths,
max_concurrent=2
)
# Balanced (recommended)
upload_result = await client.upload_images(
project_uuid,
image_paths,
max_concurrent=3
)
# Aggressive (faster, more network load)
upload_result = await client.upload_images(
project_uuid,
image_paths,
max_concurrent=5
)
Progress Reporting
Provide detailed progress for large batches:import time
from datetime import timedelta
class ProgressTracker:
def __init__(self, total_files: int):
self.total_files = total_files
self.start_time = time.time()
self.files_processed = 0
def upload_progress(self, current: int, total: int, filename: str):
percent = (current / total) * 100
elapsed = time.time() - self.start_time
if current > 0:
eta = (elapsed / current) * (total - current)
eta_str = str(timedelta(seconds=int(eta)))
else:
eta_str = "calculating..."
print(f"Upload: {percent:.1f}% ({current}/{total}) | ETA: {eta_str}", end="\r")
def download_progress(self, current: int, total: int, message: str):
percent = (current / total) * 100
print(f"Download: {percent:.1f}% ({current}/{total})", end="\r")
# Usage
tracker = ProgressTracker(len(all_photos))
await client.upload_images(
project_uuid,
image_paths,
progress_callback=tracker.upload_progress
)
Error Recovery
Implement robust error handling for large batches:import asyncio
import json
from pathlib import Path
from imagen_sdk import ImagenClient, UploadError, ProjectError, DownloadError
class BatchProcessor:
def __init__(self, api_key: str, profile_key: int):
self.api_key = api_key
self.profile_key = profile_key
self.failed_batches = []
self.completed_batches = []
async def process_batch_with_retry(self, client: ImagenClient, batch_num: int, photos: List[Path], max_retries: int = 3):
"""Process a batch with retry logic."""
for attempt in range(max_retries):
try:
project_name = f"Batch_{batch_num}_attempt_{attempt + 1}"
project_uuid = await client.create_project(project_name)
upload_result = await client.upload_images(
project_uuid,
[str(p) for p in photos]
)
if upload_result.successful == 0:
raise UploadError("No files uploaded successfully")
edit_options = EditOptions(crop=True, straighten=True)
await client.start_editing(
project_uuid,
profile_key=self.profile_key,
edit_options=edit_options
)
download_links = await client.get_download_links(project_uuid)
await client.download_files(
download_links,
output_dir=f"edited/batch_{batch_num}"
)
self.completed_batches.append(batch_num)
print(f"✅ Batch {batch_num} complete")
return True
except (UploadError, ProjectError, DownloadError) as e:
print(f"⚠️ Batch {batch_num} attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
print(f" Retrying in 5 seconds...")
await asyncio.sleep(5)
else:
print(f" ❌ Batch {batch_num} failed after {max_retries} attempts")
self.failed_batches.append({
"batch_num": batch_num,
"photos": [str(p) for p in photos],
"error": str(e)
})
return False
def save_failed_batches(self, filepath: str = "failed_batches.json"):
"""Save failed batches to JSON for retry."""
with open(filepath, "w") as f:
json.dump(self.failed_batches, f, indent=2)
print(f"Saved {len(self.failed_batches)} failed batches to {filepath}")
# Usage
async def robust_batch_processing():
processor = BatchProcessor("your_api_key", 5700)
# Process all batches
async with ImagenClient(processor.api_key) as client:
for batch_num, batch in enumerate(batches, 1):
await processor.process_batch_with_retry(client, batch_num, batch)
# Save failed batches for later retry
if processor.failed_batches:
processor.save_failed_batches()
print(f"\n⚠️ {len(processor.failed_batches)} batches failed")
print(f"\n✅ {len(processor.completed_batches)} batches completed successfully")
Memory Management
For very large collections, manage memory carefully:import gc
from pathlib import Path
from imagen_sdk import ImagenClient, EditOptions
async def memory_efficient_processing():
"""Process large collections with memory management."""
async with ImagenClient("your_api_key") as client:
# Process one batch at a time, clearing memory between batches
batch_size = 50
photo_dir = Path("photos")
for i, batch_start in enumerate(range(0, 1000, batch_size)): # 1000 total photos
# Load only files for this batch
batch_files = list(photo_dir.glob("*.cr2"))[batch_start:batch_start + batch_size]
if not batch_files:
break
print(f"Processing batch {i + 1}")
# Process batch
project_uuid = await client.create_project(f"Batch_{i + 1}")
await client.upload_images(project_uuid, [str(p) for p in batch_files])
edit_options = EditOptions(crop=True, straighten=True)
await client.start_editing(
project_uuid,
profile_key=5700,
edit_options=edit_options
)
download_links = await client.get_download_links(project_uuid)
await client.download_files(download_links, output_dir=f"edited/batch_{i + 1}")
# Clear memory
del batch_files
del download_links
gc.collect()
print(f"✅ Batch {i + 1} complete and memory cleared")
Next Steps
Error Handling
Comprehensive error handling patterns
Advanced Workflow
Step-by-step control with ImagenClient
Wedding Photography
Specialized workflow examples
API Reference
Complete ImagenClient documentation