Why Use Batch Operations?
Batch operations provide significant performance benefits:- Faster execution - Operations run in parallel
- Reduced network overhead - Fewer round trips to the API
- Better resource utilization - Efficient use of connections
- Progress tracking - Monitor operation completion
Creating a Batch
Basic Batch
Create a batch for multiple operations:from infrahub_sdk import InfrahubClient
client = InfrahubClient()
# Create a batch
batch = await client.create_batch()
# Add operations to the batch
for i in range(10):
device = await client.create(
kind="InfraDevice",
name=f"router-{i:02d}",
status="active"
)
batch.add(task=device.save, node=device)
# Execute the batch
async for node, result in batch.execute():
print(f"Created: {node.name.value}")
Batch Configuration
Configure batch behavior:batch = await client.create_batch(
return_exceptions=True, # Continue on errors
num_workers=5 # Concurrent workers
)
Batch Create Operations
Create Multiple Objects
Create many objects of the same kind:async def create_devices(count: int):
client = InfrahubClient()
batch = await client.create_batch()
for i in range(1, count + 1):
device = await client.create(
kind="InfraDevice",
name=f"router-{i:02d}",
description=f"Router #{i}",
status="active"
)
batch.add(task=device.save, node=device)
created_count = 0
async for node, result in batch.execute():
created_count += 1
print(f"Created {node.name.value}")
print(f"Total created: {created_count}")
# Create 100 devices
await create_devices(100)
Create with Relationships
Batch create objects with relationships:async def create_site_with_devices(site_name: str, device_count: int):
client = InfrahubClient()
# Create site first
site = await client.create(
kind="InfraSite",
name=site_name
)
await site.save()
# Get role
role = await client.get(kind="BuiltinRole", name__value="edge")
# Batch create devices
batch = await client.create_batch()
for i in range(1, device_count + 1):
device = await client.create(
kind="InfraDevice",
name=f"{site_name.lower()}-device-{i:02d}",
site=site,
role=role,
status="active"
)
batch.add(task=device.save, node=device)
async for node, result in batch.execute():
print(f"Created: {node.name.value}")
Create Mixed Object Types
Batch create different types of objects:async def setup_infrastructure():
client = InfrahubClient()
batch = await client.create_batch()
# Add sites
for site_name in ["NYC", "LAX", "ORD"]:
site = await client.create(
kind="InfraSite",
name=site_name
)
batch.add(task=site.save, node=site)
# Add roles
for role_name in ["edge", "core", "access"]:
role = await client.create(
kind="BuiltinRole",
name=role_name
)
batch.add(task=role.save, node=role)
# Execute all
async for node, result in batch.execute():
print(f"Created: {node._schema.kind} - {node.name.value}")
Batch Update Operations
Update Multiple Objects
Update many objects in a batch:async def update_device_status(status: str):
client = InfrahubClient()
# Get devices to update
devices = await client.filters(
kind="InfraDevice",
status__value="provisioning"
)
# Create batch
batch = await client.create_batch()
# Add updates
for device in devices:
device.status.value = status
batch.add(task=device.save, node=device)
# Execute
updated_count = 0
async for node, result in batch.execute():
updated_count += 1
print(f"Updated: {node.name.value}")
print(f"Total updated: {updated_count}")
# Update all provisioning devices to active
await update_device_status("active")
Conditional Updates
Update objects based on conditions:async def update_old_devices():
client = InfrahubClient()
# Get all devices
devices = await client.all(kind="InfraDevice")
batch = await client.create_batch()
for device in devices:
# Check condition
if device.status.value == "obsolete":
# Update to retired
device.status.value = "retired"
batch.add(task=device.save, node=device)
elif device.status.value == "maintenance":
# Add maintenance tag
maint_tag = await client.get(
kind="BuiltinTag",
name__value="maintenance"
)
if maint_tag:
current_tags = await device.tags.fetch()
device.tags.peers = current_tags + [maint_tag]
batch.add(task=device.save, node=device)
async for node, result in batch.execute():
print(f"Updated: {node.name.value}")
Batch Delete Operations
Delete Multiple Objects
Delete many objects:async def cleanup_obsolete_devices():
client = InfrahubClient()
# Get devices to delete
obsolete_devices = await client.filters(
kind="InfraDevice",
status__value="obsolete"
)
print(f"Found {len(obsolete_devices)} obsolete devices")
# Delete each
for device in obsolete_devices:
await client.delete(node=device)
print(f"Deleted: {device.name.value}")
print("Cleanup complete")
Delete operations cannot be batched using
create_batch(). They must be executed individually.Error Handling
Continue on Errors
Handle errors while continuing batch execution:from infrahub_sdk.exceptions import GraphQLError
async def batch_with_error_handling():
client = InfrahubClient()
batch = await client.create_batch(return_exceptions=True)
for i in range(10):
device = await client.create(
kind="InfraDevice",
name=f"router-{i:02d}",
status="active"
)
batch.add(task=device.save, node=device)
success_count = 0
error_count = 0
async for node, result in batch.execute():
if isinstance(result, Exception):
error_count += 1
print(f"Error creating {node.name.value}: {result}")
else:
success_count += 1
print(f"Created: {node.name.value}")
print(f"Success: {success_count}, Errors: {error_count}")
Retry Failed Operations
Retry operations that failed:async def batch_with_retry():
client = InfrahubClient()
max_retries = 3
for attempt in range(max_retries):
batch = await client.create_batch(return_exceptions=True)
failed_nodes = []
# Add all operations (or retry failed ones)
nodes_to_create = failed_nodes if attempt > 0 else range(10)
for i in nodes_to_create:
device = await client.create(
kind="InfraDevice",
name=f"router-{i:02d}",
status="active"
)
batch.add(task=device.save, node=device)
# Execute and track failures
failed_nodes = []
async for node, result in batch.execute():
if isinstance(result, Exception):
failed_nodes.append(node)
print(f"Failed: {node.name.value}")
if not failed_nodes:
print("All operations successful")
break
print(f"Attempt {attempt + 1}: {len(failed_nodes)} failed")
Progress Tracking
Track Batch Progress
Monitor batch execution progress:async def batch_with_progress():
client = InfrahubClient()
batch = await client.create_batch()
total = 100
# Add operations
for i in range(total):
device = await client.create(
kind="InfraDevice",
name=f"router-{i:02d}",
status="active"
)
batch.add(task=device.save, node=device)
# Execute with progress
completed = 0
async for node, result in batch.execute():
completed += 1
percent = (completed / total) * 100
print(f"Progress: {percent:.1f}% ({completed}/{total})")
Progress Bar
Use rich for a progress bar:from rich.progress import Progress
async def batch_with_progress_bar():
client = InfrahubClient()
batch = await client.create_batch()
total = 100
# Add operations
for i in range(total):
device = await client.create(
kind="InfraDevice",
name=f"router-{i:02d}",
status="active"
)
batch.add(task=device.save, node=device)
# Execute with progress bar
with Progress() as progress:
task = progress.add_task("Creating devices...", total=total)
async for node, result in batch.execute():
progress.update(task, advance=1)
Performance Optimization
Tune Worker Count
Adjust concurrent workers for optimal performance:# More workers = more parallelism
batch = await client.create_batch(num_workers=10)
# Fewer workers = less load on server
batch = await client.create_batch(num_workers=2)
Batch Size
Split large batches into smaller chunks:async def chunked_batch_create(items: list, chunk_size: int = 100):
client = InfrahubClient()
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
batch = await client.create_batch()
for item in chunk:
device = await client.create(
kind="InfraDevice",
**item
)
batch.add(task=device.save, node=device)
print(f"Processing chunk {i//chunk_size + 1}...")
async for node, result in batch.execute():
pass
Real-World Examples
Data Migration
Migrate data from external source:import csv
from infrahub_sdk import InfrahubClient
from infrahub_sdk.store import NodeStore
async def migrate_from_csv(csv_file: str):
client = InfrahubClient()
store = NodeStore()
# Read CSV
devices_data = []
with open(csv_file) as f:
reader = csv.DictReader(f)
devices_data = list(reader)
# Create sites first
print("Creating sites...")
site_batch = await client.create_batch()
sites = set(row["site"] for row in devices_data)
for site_name in sites:
site = await client.create(
kind="InfraSite",
name=site_name
)
site_batch.add(task=site.save, node=site)
store.set(key=site_name, node=site)
async for node, result in site_batch.execute():
print(f"Created site: {node.name.value}")
# Create devices
print(f"Creating {len(devices_data)} devices...")
device_batch = await client.create_batch()
for row in devices_data:
site = store.get(key=row["site"], kind="InfraSite")
device = await client.create(
kind="InfraDevice",
name=row["name"],
site=site,
status=row.get("status", "active")
)
device_batch.add(task=device.save, node=device)
created = 0
async for node, result in device_batch.execute():
created += 1
if created % 10 == 0:
print(f"Created {created} devices...")
print(f"Migration complete: {created} devices created")
Bulk Update from API
Update devices from external API:import httpx
async def sync_device_status():
client = InfrahubClient()
# Get external data
async with httpx.AsyncClient() as http_client:
response = await http_client.get(
"https://api.example.com/devices/status"
)
external_devices = response.json()
# Get Infrahub devices
infrahub_devices = await client.all(kind="InfraDevice")
devices_by_name = {d.name.value: d for d in infrahub_devices}
# Update batch
batch = await client.create_batch()
for ext_device in external_devices:
device = devices_by_name.get(ext_device["name"])
if device:
device.status.value = ext_device["status"]
batch.add(task=device.save, node=device)
# Execute updates
updated = 0
async for node, result in batch.execute():
updated += 1
print(f"Updated {updated} devices")
Cleanup Orphaned Objects
Find and delete orphaned objects:async def cleanup_orphaned_interfaces():
client = InfrahubClient()
# Get all interfaces
interfaces = await client.all(kind="InfraInterface")
# Find orphaned (no device)
orphaned = []
for interface in interfaces:
device = await interface.device.fetch()
if not device:
orphaned.append(interface)
print(f"Found {len(orphaned)} orphaned interfaces")
# Delete orphaned
for interface in orphaned:
await client.delete(node=interface)
print(f"Deleted: {interface.name.value}")
Best Practices
1. Use Appropriate Batch Sizes
# Good: Reasonable batch size
batch_size = 100
# Avoid: Too large (memory issues)
batch_size = 10000
# Avoid: Too small (inefficient)
batch_size = 5
2. Handle Errors Gracefully
batch = await client.create_batch(return_exceptions=True)
async for node, result in batch.execute():
if isinstance(result, Exception):
# Log error but continue
logger.error(f"Failed to create {node.name.value}: {result}")
else:
logger.info(f"Created {node.name.value}")
3. Track Progress for Long Operations
total = len(items)
completed = 0
async for node, result in batch.execute():
completed += 1
if completed % 10 == 0:
print(f"Progress: {completed}/{total}")
4. Use NodeStore for Relationships
from infrahub_sdk.store import NodeStore
store = NodeStore()
# Store created nodes
site = await client.create(kind="InfraSite", name="NYC")
await site.save()
store.set(key="NYC", node=site)
# Retrieve for relationships
nyc_site = store.get(key="NYC", kind="InfraSite")
device = await client.create(
kind="InfraDevice",
name="router-01",
site=nyc_site
)
Next Steps
Introduction
Learn more about the SDK
Queries
Advanced querying techniques
Mutations
Creating and updating data