Skip to main content
Batch operations allow you to process multiple objects efficiently, reducing API calls and improving performance.

Basic Batch Operations

Create Multiple Objects

Create many objects in a loop:
from infrahub_sdk import InfrahubClient

client = InfrahubClient()

# Create multiple devices
devices = []
for i in range(100):
    device = await client.create(
        kind="InfraDevice",
        name=f"device-{i:03d}",
        serial_number=f"SN{i:06d}"
    )
    await device.save()
    devices.append(device)

print(f"Created {len(devices)} devices")

Update Multiple Objects

Update all objects matching criteria:
# Get all devices
devices = await client.all(kind="InfraDevice")

# Update all devices
updated_count = 0
for device in devices:
    device.is_managed.value = True
    await device.save()
    updated_count += 1

print(f"Updated {updated_count} devices")

Delete Multiple Objects

Delete objects in bulk:
devices = await client.all(kind="InfraDevice")

# Delete inactive devices
deleted_count = 0
for device in devices:
    if not device.is_active.value:
        await device.delete()
        deleted_count += 1

print(f"Deleted {deleted_count} inactive devices")

Concurrent Batch Operations

Concurrent Creation

Create objects concurrently for better performance:
import asyncio
from infrahub_sdk import InfrahubClient

async def create_device(client: InfrahubClient, index: int):
    """Create a single device."""
    device = await client.create(
        kind="InfraDevice",
        name=f"device-{index:03d}",
        serial_number=f"SN{index:06d}"
    )
    await device.save()
    return device

async def create_devices_concurrent():
    client = InfrahubClient()
    
    # Create 100 devices concurrently
    tasks = [
        create_device(client, i)
        for i in range(100)
    ]
    
    devices = await asyncio.gather(*tasks)
    print(f"Created {len(devices)} devices concurrently")

if __name__ == "__main__":
    asyncio.run(create_devices_concurrent())

Concurrent Updates

Update objects concurrently:
import asyncio

async def update_device(client: InfrahubClient, device_id: str):
    """Update a single device."""
    device = await client.get(kind="InfraDevice", id=device_id)
    device.is_managed.value = True
    await device.save()
    return device

async def update_devices_concurrent():
    client = InfrahubClient()
    
    # Get all device IDs
    devices = await client.all(kind="InfraDevice")
    device_ids = [d.id for d in devices]
    
    # Update concurrently
    tasks = [
        update_device(client, device_id)
        for device_id in device_ids
    ]
    
    updated = await asyncio.gather(*tasks)
    print(f"Updated {len(updated)} devices concurrently")

Concurrent Queries

Query multiple objects concurrently:
import asyncio

async def get_device_with_relations(client: InfrahubClient, device_id: str):
    """Get device with all relationships."""
    return await client.get(
        kind="InfraDevice",
        id=device_id,
        include=["location", "tags", "interfaces"]
    )

async def query_devices_concurrent():
    client = InfrahubClient()
    
    device_ids = ["id-1", "id-2", "id-3", "id-4", "id-5"]
    
    # Query all devices concurrently
    tasks = [
        get_device_with_relations(client, device_id)
        for device_id in device_ids
    ]
    
    devices = await asyncio.gather(*tasks)
    print(f"Queried {len(devices)} devices concurrently")

Batch Processing with Chunking

Process in Chunks

Avoid overwhelming the API by processing in chunks:
import asyncio

async def process_in_chunks(
    items: list,
    chunk_size: int,
    process_func
):
    """Process items in chunks."""
    results = []
    
    for i in range(0, len(items), chunk_size):
        chunk = items[i:i + chunk_size]
        chunk_results = await asyncio.gather(*[
            process_func(item) for item in chunk
        ])
        results.extend(chunk_results)
        
        # Optional: Add delay between chunks
        if i + chunk_size < len(items):
            await asyncio.sleep(0.5)
    
    return results

# Usage
async def create_device_item(data: dict):
    client = InfrahubClient()
    device = await client.create(kind="InfraDevice", **data)
    await device.save()
    return device

device_data = [
    {"name": f"device-{i}", "serial_number": f"SN{i:06d}"}
    for i in range(1000)
]

devices = await process_in_chunks(
    items=device_data,
    chunk_size=50,
    process_func=create_device_item
)

print(f"Created {len(devices)} devices in chunks")

Chunked Updates

async def update_in_chunks(client: InfrahubClient, chunk_size: int = 50):
    """Update devices in chunks."""
    all_devices = await client.all(kind="InfraDevice")
    
    total_updated = 0
    
    for i in range(0, len(all_devices), chunk_size):
        chunk = all_devices[i:i + chunk_size]
        
        # Process chunk concurrently
        tasks = []
        for device in chunk:
            device.is_managed.value = True
            tasks.append(device.save())
        
        await asyncio.gather(*tasks)
        total_updated += len(chunk)
        
        print(f"Updated {total_updated}/{len(all_devices)} devices")
    
    return total_updated

# Use the function
total = await update_in_chunks(client, chunk_size=100)
print(f"Updated {total} devices total")

Batch Operations with Error Handling

Continue on Error

Process all items even if some fail:
import asyncio
from infrahub_sdk.exceptions import GraphQLError

async def create_with_error_handling(
    client: InfrahubClient,
    devices_data: list[dict]
):
    """Create devices with error handling."""
    results = {"success": [], "failed": []}
    
    for data in devices_data:
        try:
            device = await client.create(
                kind="InfraDevice",
                **data
            )
            await device.save()
            results["success"].append(device)
        
        except GraphQLError as e:
            results["failed"].append({
                "data": data,
                "error": str(e)
            })
    
    print(f"Created: {len(results['success'])}")
    print(f"Failed: {len(results['failed'])}")
    
    return results

# Usage
devices_data = [
    {"name": "device-1", "serial_number": "SN000001"},
    {"name": "device-2", "serial_number": "SN000002"},
    {"name": "", "serial_number": "SN000003"},  # Invalid
]

results = await create_with_error_handling(client, devices_data)

Retry Failed Operations

import asyncio
from infrahub_sdk.exceptions import GraphQLError

async def batch_with_retry(
    client: InfrahubClient,
    items: list,
    max_retries: int = 3
):
    """Batch operation with retry logic."""
    failed = []
    
    for item in items:
        for attempt in range(max_retries):
            try:
                device = await client.create(
                    kind="InfraDevice",
                    **item
                )
                await device.save()
                break
            
            except GraphQLError as e:
                if attempt < max_retries - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                else:
                    failed.append({"item": item, "error": str(e)})
    
    return failed

Transactional Batches

Rollback all changes if any operation fails:
async def transactional_batch(
    client: InfrahubClient,
    branch_name: str,
    operations: list
) -> bool:
    """Execute batch operations transactionally using a branch."""
    # Create temporary branch for transaction
    branch = await client.branch.create(branch_name=branch_name)
    
    try:
        # Execute all operations on the branch
        for operation in operations:
            await operation(client, branch.name)
        
        # If all succeed, merge (or return success for manual merge)
        print(f"All operations succeeded on branch {branch.name}")
        return True
    
    except Exception as e:
        # If any fail, delete the branch (rollback)
        print(f"Operation failed: {e}")
        await client.branch.delete(branch_name=branch.name)
        print(f"Rolled back by deleting branch {branch.name}")
        return False

# Usage
async def create_device_op(client, branch):
    device = await client.create(
        kind="InfraDevice",
        name="device-1",
        serial_number="SN000001",
        branch=branch
    )
    await device.save()

operations = [create_device_op, create_device_op]
success = await transactional_batch(
    client,
    "transaction-batch",
    operations
)

Progress Tracking

Simple Progress Bar

import asyncio

async def batch_with_progress(
    client: InfrahubClient,
    items: list,
    chunk_size: int = 10
):
    """Batch operation with progress tracking."""
    total = len(items)
    processed = 0
    
    for i in range(0, total, chunk_size):
        chunk = items[i:i + chunk_size]
        
        # Process chunk
        for item in chunk:
            device = await client.create(
                kind="InfraDevice",
                **item
            )
            await device.save()
            processed += 1
        
        # Show progress
        percent = (processed / total) * 100
        print(f"Progress: {processed}/{total} ({percent:.1f}%)")
    
    print("Batch operation complete!")

# Usage
device_data = [
    {"name": f"device-{i}", "serial_number": f"SN{i:06d}"}
    for i in range(100)
]

await batch_with_progress(client, device_data)

Rich Progress Bar

from rich.progress import Progress, TaskID
import asyncio

async def batch_with_rich_progress(
    client: InfrahubClient,
    items: list
):
    """Batch operation with rich progress bar."""
    with Progress() as progress:
        task = progress.add_task(
            "[cyan]Creating devices...",
            total=len(items)
        )
        
        for item in items:
            device = await client.create(
                kind="InfraDevice",
                **item
            )
            await device.save()
            progress.update(task, advance=1)
        
        print("\nBatch complete!")

Advanced Patterns

Parallel Batch Processing

import asyncio
from typing import Callable, TypeVar

T = TypeVar('T')

async def parallel_batch(
    items: list[T],
    process_func: Callable,
    max_concurrent: int = 10
) -> list:
    """Process items with controlled concurrency."""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_with_semaphore(item):
        async with semaphore:
            return await process_func(item)
    
    tasks = [process_with_semaphore(item) for item in items]
    return await asyncio.gather(*tasks)

# Usage
async def create_device_from_data(data: dict):
    client = InfrahubClient()
    device = await client.create(kind="InfraDevice", **data)
    await device.save()
    return device

device_data = [
    {"name": f"device-{i}", "serial_number": f"SN{i:06d}"}
    for i in range(1000)
]

devices = await parallel_batch(
    items=device_data,
    process_func=create_device_from_data,
    max_concurrent=20
)

print(f"Created {len(devices)} devices with controlled concurrency")

Conditional Batch Operations

async def conditional_batch_update(
    client: InfrahubClient,
    condition: Callable,
    update_func: Callable
):
    """Update only objects matching condition."""
    all_devices = await client.all(kind="InfraDevice")
    
    to_update = [d for d in all_devices if condition(d)]
    
    print(f"Found {len(to_update)} devices to update")
    
    for device in to_update:
        await update_func(device)
        await device.save()
    
    print(f"Updated {len(to_update)} devices")

# Usage
await conditional_batch_update(
    client=client,
    condition=lambda d: not d.is_active.value,
    update_func=lambda d: setattr(d.is_active, 'value', True)
)

Bulk Data Import

import asyncio
import csv
from pathlib import Path

async def import_from_csv(
    client: InfrahubClient,
    csv_path: Path,
    chunk_size: int = 100
):
    """Import devices from CSV file."""
    with open(csv_path, 'r') as f:
        reader = csv.DictReader(f)
        rows = list(reader)
    
    total = len(rows)
    created = 0
    
    for i in range(0, total, chunk_size):
        chunk = rows[i:i + chunk_size]
        
        tasks = []
        for row in chunk:
            device = await client.create(
                kind="InfraDevice",
                name=row['name'],
                serial_number=row['serial_number']
            )
            tasks.append(device.save())
        
        await asyncio.gather(*tasks)
        created += len(chunk)
        
        print(f"Imported {created}/{total} devices")
    
    return created

# Usage
total_imported = await import_from_csv(
    client=client,
    csv_path=Path("devices.csv"),
    chunk_size=50
)

Performance Tips

Leverage asyncio.gather() to process multiple items concurrently.
Break large batches into smaller chunks to avoid overwhelming the API.
Use asyncio.Semaphore to control the number of concurrent operations.
Continue processing even if some operations fail, then retry or report failures.
Create a branch for batch operations to enable rollback if needed.

Next Steps

Async Operations

Master async programming patterns

Error Handling

Handle batch operation errors

Pagination

Work with large datasets

Branches

Use branches for batch operations

Build docs developers (and LLMs) love