Basic Batch Operations
Create Multiple Objects
Create many objects in a loop:from infrahub_sdk import InfrahubClient
client = InfrahubClient()
# Create multiple devices
devices = []
for i in range(100):
device = await client.create(
kind="InfraDevice",
name=f"device-{i:03d}",
serial_number=f"SN{i:06d}"
)
await device.save()
devices.append(device)
print(f"Created {len(devices)} devices")
Update Multiple Objects
Update all objects matching criteria:# Get all devices
devices = await client.all(kind="InfraDevice")
# Update all devices
updated_count = 0
for device in devices:
device.is_managed.value = True
await device.save()
updated_count += 1
print(f"Updated {updated_count} devices")
Delete Multiple Objects
Delete objects in bulk:devices = await client.all(kind="InfraDevice")
# Delete inactive devices
deleted_count = 0
for device in devices:
if not device.is_active.value:
await device.delete()
deleted_count += 1
print(f"Deleted {deleted_count} inactive devices")
Concurrent Batch Operations
Concurrent Creation
Create objects concurrently for better performance:import asyncio
from infrahub_sdk import InfrahubClient
async def create_device(client: InfrahubClient, index: int):
"""Create a single device."""
device = await client.create(
kind="InfraDevice",
name=f"device-{index:03d}",
serial_number=f"SN{index:06d}"
)
await device.save()
return device
async def create_devices_concurrent():
client = InfrahubClient()
# Create 100 devices concurrently
tasks = [
create_device(client, i)
for i in range(100)
]
devices = await asyncio.gather(*tasks)
print(f"Created {len(devices)} devices concurrently")
if __name__ == "__main__":
asyncio.run(create_devices_concurrent())
Concurrent Updates
Update objects concurrently:import asyncio
async def update_device(client: InfrahubClient, device_id: str):
"""Update a single device."""
device = await client.get(kind="InfraDevice", id=device_id)
device.is_managed.value = True
await device.save()
return device
async def update_devices_concurrent():
client = InfrahubClient()
# Get all device IDs
devices = await client.all(kind="InfraDevice")
device_ids = [d.id for d in devices]
# Update concurrently
tasks = [
update_device(client, device_id)
for device_id in device_ids
]
updated = await asyncio.gather(*tasks)
print(f"Updated {len(updated)} devices concurrently")
Concurrent Queries
Query multiple objects concurrently:import asyncio
async def get_device_with_relations(client: InfrahubClient, device_id: str):
"""Get device with all relationships."""
return await client.get(
kind="InfraDevice",
id=device_id,
include=["location", "tags", "interfaces"]
)
async def query_devices_concurrent():
client = InfrahubClient()
device_ids = ["id-1", "id-2", "id-3", "id-4", "id-5"]
# Query all devices concurrently
tasks = [
get_device_with_relations(client, device_id)
for device_id in device_ids
]
devices = await asyncio.gather(*tasks)
print(f"Queried {len(devices)} devices concurrently")
Batch Processing with Chunking
Process in Chunks
Avoid overwhelming the API by processing in chunks:import asyncio
async def process_in_chunks(
items: list,
chunk_size: int,
process_func
):
"""Process items in chunks."""
results = []
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
chunk_results = await asyncio.gather(*[
process_func(item) for item in chunk
])
results.extend(chunk_results)
# Optional: Add delay between chunks
if i + chunk_size < len(items):
await asyncio.sleep(0.5)
return results
# Usage
async def create_device_item(data: dict):
client = InfrahubClient()
device = await client.create(kind="InfraDevice", **data)
await device.save()
return device
device_data = [
{"name": f"device-{i}", "serial_number": f"SN{i:06d}"}
for i in range(1000)
]
devices = await process_in_chunks(
items=device_data,
chunk_size=50,
process_func=create_device_item
)
print(f"Created {len(devices)} devices in chunks")
Chunked Updates
async def update_in_chunks(client: InfrahubClient, chunk_size: int = 50):
"""Update devices in chunks."""
all_devices = await client.all(kind="InfraDevice")
total_updated = 0
for i in range(0, len(all_devices), chunk_size):
chunk = all_devices[i:i + chunk_size]
# Process chunk concurrently
tasks = []
for device in chunk:
device.is_managed.value = True
tasks.append(device.save())
await asyncio.gather(*tasks)
total_updated += len(chunk)
print(f"Updated {total_updated}/{len(all_devices)} devices")
return total_updated
# Use the function
total = await update_in_chunks(client, chunk_size=100)
print(f"Updated {total} devices total")
Batch Operations with Error Handling
Continue on Error
Process all items even if some fail:import asyncio
from infrahub_sdk.exceptions import GraphQLError
async def create_with_error_handling(
client: InfrahubClient,
devices_data: list[dict]
):
"""Create devices with error handling."""
results = {"success": [], "failed": []}
for data in devices_data:
try:
device = await client.create(
kind="InfraDevice",
**data
)
await device.save()
results["success"].append(device)
except GraphQLError as e:
results["failed"].append({
"data": data,
"error": str(e)
})
print(f"Created: {len(results['success'])}")
print(f"Failed: {len(results['failed'])}")
return results
# Usage
devices_data = [
{"name": "device-1", "serial_number": "SN000001"},
{"name": "device-2", "serial_number": "SN000002"},
{"name": "", "serial_number": "SN000003"}, # Invalid
]
results = await create_with_error_handling(client, devices_data)
Retry Failed Operations
import asyncio
from infrahub_sdk.exceptions import GraphQLError
async def batch_with_retry(
client: InfrahubClient,
items: list,
max_retries: int = 3
):
"""Batch operation with retry logic."""
failed = []
for item in items:
for attempt in range(max_retries):
try:
device = await client.create(
kind="InfraDevice",
**item
)
await device.save()
break
except GraphQLError as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
else:
failed.append({"item": item, "error": str(e)})
return failed
Transactional Batches
Rollback all changes if any operation fails:async def transactional_batch(
client: InfrahubClient,
branch_name: str,
operations: list
) -> bool:
"""Execute batch operations transactionally using a branch."""
# Create temporary branch for transaction
branch = await client.branch.create(branch_name=branch_name)
try:
# Execute all operations on the branch
for operation in operations:
await operation(client, branch.name)
# If all succeed, merge (or return success for manual merge)
print(f"All operations succeeded on branch {branch.name}")
return True
except Exception as e:
# If any fail, delete the branch (rollback)
print(f"Operation failed: {e}")
await client.branch.delete(branch_name=branch.name)
print(f"Rolled back by deleting branch {branch.name}")
return False
# Usage
async def create_device_op(client, branch):
device = await client.create(
kind="InfraDevice",
name="device-1",
serial_number="SN000001",
branch=branch
)
await device.save()
operations = [create_device_op, create_device_op]
success = await transactional_batch(
client,
"transaction-batch",
operations
)
Progress Tracking
Simple Progress Bar
import asyncio
async def batch_with_progress(
client: InfrahubClient,
items: list,
chunk_size: int = 10
):
"""Batch operation with progress tracking."""
total = len(items)
processed = 0
for i in range(0, total, chunk_size):
chunk = items[i:i + chunk_size]
# Process chunk
for item in chunk:
device = await client.create(
kind="InfraDevice",
**item
)
await device.save()
processed += 1
# Show progress
percent = (processed / total) * 100
print(f"Progress: {processed}/{total} ({percent:.1f}%)")
print("Batch operation complete!")
# Usage
device_data = [
{"name": f"device-{i}", "serial_number": f"SN{i:06d}"}
for i in range(100)
]
await batch_with_progress(client, device_data)
Rich Progress Bar
from rich.progress import Progress, TaskID
import asyncio
async def batch_with_rich_progress(
client: InfrahubClient,
items: list
):
"""Batch operation with rich progress bar."""
with Progress() as progress:
task = progress.add_task(
"[cyan]Creating devices...",
total=len(items)
)
for item in items:
device = await client.create(
kind="InfraDevice",
**item
)
await device.save()
progress.update(task, advance=1)
print("\nBatch complete!")
Advanced Patterns
Parallel Batch Processing
import asyncio
from typing import Callable, TypeVar
T = TypeVar('T')
async def parallel_batch(
items: list[T],
process_func: Callable,
max_concurrent: int = 10
) -> list:
"""Process items with controlled concurrency."""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(item):
async with semaphore:
return await process_func(item)
tasks = [process_with_semaphore(item) for item in items]
return await asyncio.gather(*tasks)
# Usage
async def create_device_from_data(data: dict):
client = InfrahubClient()
device = await client.create(kind="InfraDevice", **data)
await device.save()
return device
device_data = [
{"name": f"device-{i}", "serial_number": f"SN{i:06d}"}
for i in range(1000)
]
devices = await parallel_batch(
items=device_data,
process_func=create_device_from_data,
max_concurrent=20
)
print(f"Created {len(devices)} devices with controlled concurrency")
Conditional Batch Operations
async def conditional_batch_update(
client: InfrahubClient,
condition: Callable,
update_func: Callable
):
"""Update only objects matching condition."""
all_devices = await client.all(kind="InfraDevice")
to_update = [d for d in all_devices if condition(d)]
print(f"Found {len(to_update)} devices to update")
for device in to_update:
await update_func(device)
await device.save()
print(f"Updated {len(to_update)} devices")
# Usage
await conditional_batch_update(
client=client,
condition=lambda d: not d.is_active.value,
update_func=lambda d: setattr(d.is_active, 'value', True)
)
Bulk Data Import
import asyncio
import csv
from pathlib import Path
async def import_from_csv(
client: InfrahubClient,
csv_path: Path,
chunk_size: int = 100
):
"""Import devices from CSV file."""
with open(csv_path, 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)
total = len(rows)
created = 0
for i in range(0, total, chunk_size):
chunk = rows[i:i + chunk_size]
tasks = []
for row in chunk:
device = await client.create(
kind="InfraDevice",
name=row['name'],
serial_number=row['serial_number']
)
tasks.append(device.save())
await asyncio.gather(*tasks)
created += len(chunk)
print(f"Imported {created}/{total} devices")
return created
# Usage
total_imported = await import_from_csv(
client=client,
csv_path=Path("devices.csv"),
chunk_size=50
)
Performance Tips
Use concurrent operations
Use concurrent operations
Leverage
asyncio.gather() to process multiple items concurrently.Process in chunks
Process in chunks
Break large batches into smaller chunks to avoid overwhelming the API.
Limit concurrent requests
Limit concurrent requests
Use
asyncio.Semaphore to control the number of concurrent operations.Handle errors gracefully
Handle errors gracefully
Continue processing even if some operations fail, then retry or report failures.
Use branches for transactions
Use branches for transactions
Create a branch for batch operations to enable rollback if needed.
Next Steps
Async Operations
Master async programming patterns
Error Handling
Handle batch operation errors
Pagination
Work with large datasets
Branches
Use branches for batch operations