Skip to main content
Batch operations allow you to perform multiple create, update, or delete operations efficiently by executing them in parallel or in optimized batches.

Why Use Batch Operations?

Batch operations provide significant performance benefits:
  • Faster execution - Operations run in parallel
  • Reduced network overhead - Fewer round trips to the API
  • Better resource utilization - Efficient use of connections
  • Progress tracking - Monitor operation completion

Creating a Batch

Basic Batch

Create a batch for multiple operations:
from infrahub_sdk import InfrahubClient

client = InfrahubClient()

# Create a batch
batch = await client.create_batch()

# Add operations to the batch
for i in range(10):
    device = await client.create(
        kind="InfraDevice",
        name=f"router-{i:02d}",
        status="active"
    )
    batch.add(task=device.save, node=device)

# Execute the batch
async for node, result in batch.execute():
    print(f"Created: {node.name.value}")

Batch Configuration

Configure batch behavior:
batch = await client.create_batch(
    return_exceptions=True,  # Continue on errors
    num_workers=5  # Concurrent workers
)

Batch Create Operations

Create Multiple Objects

Create many objects of the same kind:
async def create_devices(count: int):
    client = InfrahubClient()
    batch = await client.create_batch()
    
    for i in range(1, count + 1):
        device = await client.create(
            kind="InfraDevice",
            name=f"router-{i:02d}",
            description=f"Router #{i}",
            status="active"
        )
        batch.add(task=device.save, node=device)
    
    created_count = 0
    async for node, result in batch.execute():
        created_count += 1
        print(f"Created {node.name.value}")
    
    print(f"Total created: {created_count}")

# Create 100 devices
await create_devices(100)

Create with Relationships

Batch create objects with relationships:
async def create_site_with_devices(site_name: str, device_count: int):
    client = InfrahubClient()
    
    # Create site first
    site = await client.create(
        kind="InfraSite",
        name=site_name
    )
    await site.save()
    
    # Get role
    role = await client.get(kind="BuiltinRole", name__value="edge")
    
    # Batch create devices
    batch = await client.create_batch()
    for i in range(1, device_count + 1):
        device = await client.create(
            kind="InfraDevice",
            name=f"{site_name.lower()}-device-{i:02d}",
            site=site,
            role=role,
            status="active"
        )
        batch.add(task=device.save, node=device)
    
    async for node, result in batch.execute():
        print(f"Created: {node.name.value}")

Create Mixed Object Types

Batch create different types of objects:
async def setup_infrastructure():
    client = InfrahubClient()
    batch = await client.create_batch()
    
    # Add sites
    for site_name in ["NYC", "LAX", "ORD"]:
        site = await client.create(
            kind="InfraSite",
            name=site_name
        )
        batch.add(task=site.save, node=site)
    
    # Add roles
    for role_name in ["edge", "core", "access"]:
        role = await client.create(
            kind="BuiltinRole",
            name=role_name
        )
        batch.add(task=role.save, node=role)
    
    # Execute all
    async for node, result in batch.execute():
        print(f"Created: {node._schema.kind} - {node.name.value}")

Batch Update Operations

Update Multiple Objects

Update many objects in a batch:
async def update_device_status(status: str):
    client = InfrahubClient()
    
    # Get devices to update
    devices = await client.filters(
        kind="InfraDevice",
        status__value="provisioning"
    )
    
    # Create batch
    batch = await client.create_batch()
    
    # Add updates
    for device in devices:
        device.status.value = status
        batch.add(task=device.save, node=device)
    
    # Execute
    updated_count = 0
    async for node, result in batch.execute():
        updated_count += 1
        print(f"Updated: {node.name.value}")
    
    print(f"Total updated: {updated_count}")

# Update all provisioning devices to active
await update_device_status("active")

Conditional Updates

Update objects based on conditions:
async def update_old_devices():
    client = InfrahubClient()
    
    # Get all devices
    devices = await client.all(kind="InfraDevice")
    
    batch = await client.create_batch()
    
    for device in devices:
        # Check condition
        if device.status.value == "obsolete":
            # Update to retired
            device.status.value = "retired"
            batch.add(task=device.save, node=device)
        elif device.status.value == "maintenance":
            # Add maintenance tag
            maint_tag = await client.get(
                kind="BuiltinTag",
                name__value="maintenance"
            )
            if maint_tag:
                current_tags = await device.tags.fetch()
                device.tags.peers = current_tags + [maint_tag]
                batch.add(task=device.save, node=device)
    
    async for node, result in batch.execute():
        print(f"Updated: {node.name.value}")

Batch Delete Operations

Delete Multiple Objects

Delete many objects:
async def cleanup_obsolete_devices():
    client = InfrahubClient()
    
    # Get devices to delete
    obsolete_devices = await client.filters(
        kind="InfraDevice",
        status__value="obsolete"
    )
    
    print(f"Found {len(obsolete_devices)} obsolete devices")
    
    # Delete each
    for device in obsolete_devices:
        await client.delete(node=device)
        print(f"Deleted: {device.name.value}")
    
    print("Cleanup complete")
Delete operations cannot be batched using create_batch(). They must be executed individually.

Error Handling

Continue on Errors

Handle errors while continuing batch execution:
from infrahub_sdk.exceptions import GraphQLError

async def batch_with_error_handling():
    client = InfrahubClient()
    batch = await client.create_batch(return_exceptions=True)
    
    for i in range(10):
        device = await client.create(
            kind="InfraDevice",
            name=f"router-{i:02d}",
            status="active"
        )
        batch.add(task=device.save, node=device)
    
    success_count = 0
    error_count = 0
    
    async for node, result in batch.execute():
        if isinstance(result, Exception):
            error_count += 1
            print(f"Error creating {node.name.value}: {result}")
        else:
            success_count += 1
            print(f"Created: {node.name.value}")
    
    print(f"Success: {success_count}, Errors: {error_count}")

Retry Failed Operations

Retry operations that failed:
async def batch_with_retry():
    client = InfrahubClient()
    max_retries = 3
    
    for attempt in range(max_retries):
        batch = await client.create_batch(return_exceptions=True)
        failed_nodes = []
        
        # Add all operations (or retry failed ones)
        nodes_to_create = failed_nodes if attempt > 0 else range(10)
        
        for i in nodes_to_create:
            device = await client.create(
                kind="InfraDevice",
                name=f"router-{i:02d}",
                status="active"
            )
            batch.add(task=device.save, node=device)
        
        # Execute and track failures
        failed_nodes = []
        async for node, result in batch.execute():
            if isinstance(result, Exception):
                failed_nodes.append(node)
                print(f"Failed: {node.name.value}")
        
        if not failed_nodes:
            print("All operations successful")
            break
        
        print(f"Attempt {attempt + 1}: {len(failed_nodes)} failed")

Progress Tracking

Track Batch Progress

Monitor batch execution progress:
async def batch_with_progress():
    client = InfrahubClient()
    batch = await client.create_batch()
    
    total = 100
    
    # Add operations
    for i in range(total):
        device = await client.create(
            kind="InfraDevice",
            name=f"router-{i:02d}",
            status="active"
        )
        batch.add(task=device.save, node=device)
    
    # Execute with progress
    completed = 0
    async for node, result in batch.execute():
        completed += 1
        percent = (completed / total) * 100
        print(f"Progress: {percent:.1f}% ({completed}/{total})")

Progress Bar

Use rich for a progress bar:
from rich.progress import Progress

async def batch_with_progress_bar():
    client = InfrahubClient()
    batch = await client.create_batch()
    
    total = 100
    
    # Add operations
    for i in range(total):
        device = await client.create(
            kind="InfraDevice",
            name=f"router-{i:02d}",
            status="active"
        )
        batch.add(task=device.save, node=device)
    
    # Execute with progress bar
    with Progress() as progress:
        task = progress.add_task("Creating devices...", total=total)
        
        async for node, result in batch.execute():
            progress.update(task, advance=1)

Performance Optimization

Tune Worker Count

Adjust concurrent workers for optimal performance:
# More workers = more parallelism
batch = await client.create_batch(num_workers=10)

# Fewer workers = less load on server
batch = await client.create_batch(num_workers=2)

Batch Size

Split large batches into smaller chunks:
async def chunked_batch_create(items: list, chunk_size: int = 100):
    client = InfrahubClient()
    
    for i in range(0, len(items), chunk_size):
        chunk = items[i:i + chunk_size]
        batch = await client.create_batch()
        
        for item in chunk:
            device = await client.create(
                kind="InfraDevice",
                **item
            )
            batch.add(task=device.save, node=device)
        
        print(f"Processing chunk {i//chunk_size + 1}...")
        async for node, result in batch.execute():
            pass

Real-World Examples

Data Migration

Migrate data from external source:
import csv
from infrahub_sdk import InfrahubClient
from infrahub_sdk.store import NodeStore

async def migrate_from_csv(csv_file: str):
    client = InfrahubClient()
    store = NodeStore()
    
    # Read CSV
    devices_data = []
    with open(csv_file) as f:
        reader = csv.DictReader(f)
        devices_data = list(reader)
    
    # Create sites first
    print("Creating sites...")
    site_batch = await client.create_batch()
    sites = set(row["site"] for row in devices_data)
    
    for site_name in sites:
        site = await client.create(
            kind="InfraSite",
            name=site_name
        )
        site_batch.add(task=site.save, node=site)
        store.set(key=site_name, node=site)
    
    async for node, result in site_batch.execute():
        print(f"Created site: {node.name.value}")
    
    # Create devices
    print(f"Creating {len(devices_data)} devices...")
    device_batch = await client.create_batch()
    
    for row in devices_data:
        site = store.get(key=row["site"], kind="InfraSite")
        device = await client.create(
            kind="InfraDevice",
            name=row["name"],
            site=site,
            status=row.get("status", "active")
        )
        device_batch.add(task=device.save, node=device)
    
    created = 0
    async for node, result in device_batch.execute():
        created += 1
        if created % 10 == 0:
            print(f"Created {created} devices...")
    
    print(f"Migration complete: {created} devices created")

Bulk Update from API

Update devices from external API:
import httpx

async def sync_device_status():
    client = InfrahubClient()
    
    # Get external data
    async with httpx.AsyncClient() as http_client:
        response = await http_client.get(
            "https://api.example.com/devices/status"
        )
        external_devices = response.json()
    
    # Get Infrahub devices
    infrahub_devices = await client.all(kind="InfraDevice")
    devices_by_name = {d.name.value: d for d in infrahub_devices}
    
    # Update batch
    batch = await client.create_batch()
    
    for ext_device in external_devices:
        device = devices_by_name.get(ext_device["name"])
        if device:
            device.status.value = ext_device["status"]
            batch.add(task=device.save, node=device)
    
    # Execute updates
    updated = 0
    async for node, result in batch.execute():
        updated += 1
    
    print(f"Updated {updated} devices")

Cleanup Orphaned Objects

Find and delete orphaned objects:
async def cleanup_orphaned_interfaces():
    client = InfrahubClient()
    
    # Get all interfaces
    interfaces = await client.all(kind="InfraInterface")
    
    # Find orphaned (no device)
    orphaned = []
    for interface in interfaces:
        device = await interface.device.fetch()
        if not device:
            orphaned.append(interface)
    
    print(f"Found {len(orphaned)} orphaned interfaces")
    
    # Delete orphaned
    for interface in orphaned:
        await client.delete(node=interface)
        print(f"Deleted: {interface.name.value}")

Best Practices

1. Use Appropriate Batch Sizes

# Good: Reasonable batch size
batch_size = 100

# Avoid: Too large (memory issues)
batch_size = 10000

# Avoid: Too small (inefficient)
batch_size = 5

2. Handle Errors Gracefully

batch = await client.create_batch(return_exceptions=True)

async for node, result in batch.execute():
    if isinstance(result, Exception):
        # Log error but continue
        logger.error(f"Failed to create {node.name.value}: {result}")
    else:
        logger.info(f"Created {node.name.value}")

3. Track Progress for Long Operations

total = len(items)
completed = 0

async for node, result in batch.execute():
    completed += 1
    if completed % 10 == 0:
        print(f"Progress: {completed}/{total}")

4. Use NodeStore for Relationships

from infrahub_sdk.store import NodeStore

store = NodeStore()

# Store created nodes
site = await client.create(kind="InfraSite", name="NYC")
await site.save()
store.set(key="NYC", node=site)

# Retrieve for relationships
nyc_site = store.get(key="NYC", kind="InfraSite")
device = await client.create(
    kind="InfraDevice",
    name="router-01",
    site=nyc_site
)

Next Steps

Introduction

Learn more about the SDK

Queries

Advanced querying techniques

Mutations

Creating and updating data

Build docs developers (and LLMs) love