Skip to main content
The Infrahub Python SDK is built with async/await for high-performance concurrent operations.

Understanding Async/Await

Why Async?

Async programming allows concurrent I/O operations:
  • Non-blocking: Don’t wait for API responses
  • Concurrent: Process multiple requests simultaneously
  • Efficient: Better resource utilization
  • Scalable: Handle more operations in less time

Basic Async Pattern

import asyncio
from infrahub_sdk import InfrahubClient

async def main():
    client = InfrahubClient()
    
    # All SDK operations are async
    device = await client.get(kind="InfraDevice", id="device-id")
    print(device.name.value)

if __name__ == "__main__":
    # Run async function
    asyncio.run(main())

Concurrent Operations

Query Multiple Objects Concurrently

Fetch multiple objects at once:
import asyncio
from infrahub_sdk import InfrahubClient

async def get_multiple_devices():
    client = InfrahubClient()
    
    device_ids = ["id-1", "id-2", "id-3", "id-4", "id-5"]
    
    # Create tasks for all queries
    tasks = [
        client.get(kind="InfraDevice", id=device_id)
        for device_id in device_ids
    ]
    
    # Execute all queries concurrently
    devices = await asyncio.gather(*tasks)
    
    print(f"Retrieved {len(devices)} devices concurrently")
    for device in devices:
        print(f"  - {device.name.value}")

if __name__ == "__main__":
    asyncio.run(get_multiple_devices())

Create Multiple Objects Concurrently

import asyncio
from infrahub_sdk import InfrahubClient

async def create_device(client: InfrahubClient, index: int):
    """Create a single device."""
    device = await client.create(
        kind="InfraDevice",
        name=f"device-{index:03d}",
        serial_number=f"SN{index:06d}"
    )
    await device.save()
    return device

async def create_devices_concurrently():
    client = InfrahubClient()
    
    # Create 100 devices concurrently
    tasks = [
        create_device(client, i)
        for i in range(100)
    ]
    
    devices = await asyncio.gather(*tasks)
    
    print(f"Created {len(devices)} devices")

if __name__ == "__main__":
    asyncio.run(create_devices_concurrently())

Update Multiple Objects Concurrently

import asyncio

async def update_device(client: InfrahubClient, device_id: str):
    """Update a single device."""
    device = await client.get(kind="InfraDevice", id=device_id)
    device.is_managed.value = True
    await device.save()
    return device

async def update_all_devices():
    client = InfrahubClient()
    
    # Get all devices
    all_devices = await client.all(kind="InfraDevice")
    
    # Update concurrently
    tasks = [
        update_device(client, device.id)
        for device in all_devices
    ]
    
    updated = await asyncio.gather(*tasks)
    print(f"Updated {len(updated)} devices concurrently")

if __name__ == "__main__":
    asyncio.run(update_all_devices())

Controlling Concurrency

Using Semaphore for Rate Limiting

Limit the number of concurrent operations:
import asyncio
from infrahub_sdk import InfrahubClient

async def fetch_device_with_limit(
    client: InfrahubClient,
    device_id: str,
    semaphore: asyncio.Semaphore
):
    """Fetch device with concurrency limit."""
    async with semaphore:
        return await client.get(kind="InfraDevice", id=device_id)

async def limited_concurrent_fetch():
    client = InfrahubClient()
    
    # Limit to 10 concurrent requests
    semaphore = asyncio.Semaphore(10)
    
    device_ids = [f"id-{i}" for i in range(100)]
    
    tasks = [
        fetch_device_with_limit(client, device_id, semaphore)
        for device_id in device_ids
    ]
    
    devices = await asyncio.gather(*tasks)
    print(f"Fetched {len(devices)} devices with max 10 concurrent")

if __name__ == "__main__":
    asyncio.run(limited_concurrent_fetch())

Processing in Batches

Process items in sequential batches:
import asyncio

async def process_in_batches(
    client: InfrahubClient,
    items: list,
    batch_size: int = 10
):
    """Process items in batches."""
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # Process batch concurrently
        batch_tasks = [
            client.get(kind="InfraDevice", id=item_id)
            for item_id in batch
        ]
        
        batch_results = await asyncio.gather(*batch_tasks)
        results.extend(batch_results)
        
        print(f"Processed batch {i//batch_size + 1}: {len(batch)} items")
        
        # Optional delay between batches
        await asyncio.sleep(0.5)
    
    return results

# Usage
device_ids = [f"id-{i}" for i in range(100)]
devices = await process_in_batches(client, device_ids, batch_size=20)

Error Handling in Async

Handle Errors with gather

Continue processing even if some tasks fail:
import asyncio
from infrahub_sdk.exceptions import GraphQLError

async def safe_get_device(
    client: InfrahubClient,
    device_id: str
):
    """Get device with error handling."""
    try:
        return await client.get(kind="InfraDevice", id=device_id)
    except GraphQLError as e:
        print(f"Error fetching {device_id}: {e.message}")
        return None

async def fetch_all_safely():
    client = InfrahubClient()
    
    device_ids = ["id-1", "invalid-id", "id-3"]
    
    tasks = [
        safe_get_device(client, device_id)
        for device_id in device_ids
    ]
    
    devices = await asyncio.gather(*tasks)
    
    # Filter out None results
    valid_devices = [d for d in devices if d is not None]
    
    print(f"Successfully fetched {len(valid_devices)} devices")

Using return_exceptions

import asyncio

async def fetch_with_exceptions():
    client = InfrahubClient()
    
    device_ids = ["id-1", "invalid-id", "id-3"]
    
    tasks = [
        client.get(kind="InfraDevice", id=device_id)
        for device_id in device_ids
    ]
    
    # Return exceptions instead of raising
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Process results
    devices = []
    errors = []
    
    for result in results:
        if isinstance(result, Exception):
            errors.append(result)
        else:
            devices.append(result)
    
    print(f"Devices: {len(devices)}, Errors: {len(errors)}")

Async Context Managers

Using Client as Context Manager

import asyncio
from infrahub_sdk import InfrahubClient

async def use_context_manager():
    async with InfrahubClient() as client:
        devices = await client.all(kind="InfraDevice")
        print(f"Found {len(devices)} devices")
        
        # Client automatically closes on exit

if __name__ == "__main__":
    asyncio.run(use_context_manager())

Custom Async Context Manager

from contextlib import asynccontextmanager

@asynccontextmanager
async def device_transaction(client: InfrahubClient, branch_name: str):
    """Context manager for transactional device operations."""
    # Setup: create branch
    branch = await client.branch.create(branch_name=branch_name)
    
    try:
        yield client, branch.name
        # Commit: merge or validate
        print(f"Transaction succeeded on {branch.name}")
    
    except Exception as e:
        # Rollback: delete branch
        await client.branch.delete(branch_name=branch.name)
        print(f"Transaction rolled back: {e}")
        raise

# Usage
async with device_transaction(client, "tx-branch") as (client, branch):
    device = await client.create(
        kind="InfraDevice",
        name="new-device",
        serial_number="SN123456",
        branch=branch
    )
    await device.save()

Async Generators

Stream Results

Process items as they’re retrieved:
import asyncio
from typing import AsyncGenerator

async def stream_devices(
    client: InfrahubClient,
    batch_size: int = 10
) -> AsyncGenerator:
    """Stream devices in batches."""
    offset = 0
    
    while True:
        # Query batch using GraphQL
        query = """
        query GetDevices($offset: Int!, $limit: Int!) {
          InfraDevice(offset: $offset, limit: $limit) {
            edges {
              node {
                id
                name { value }
              }
            }
          }
        }
        """
        
        result = await client.execute_graphql(
            query=query,
            variables={"offset": offset, "limit": batch_size}
        )
        
        devices = result["InfraDevice"]["edges"]
        
        if not devices:
            break
        
        for device in devices:
            yield device["node"]
        
        offset += batch_size

# Usage
async def process_stream():
    client = InfrahubClient()
    
    count = 0
    async for device in stream_devices(client):
        print(f"Processing: {device['name']['value']}")
        count += 1
    
    print(f"Processed {count} devices")

Advanced Patterns

Task Coordination

Coordinate multiple async tasks:
import asyncio

async def coordinator():
    client = InfrahubClient()
    
    # Start multiple concurrent workflows
    task1 = asyncio.create_task(create_devices(client, 10))
    task2 = asyncio.create_task(update_devices(client))
    task3 = asyncio.create_task(query_devices(client))
    
    # Wait for all to complete
    results = await asyncio.gather(task1, task2, task3)
    
    print("All workflows completed")
    return results

async def create_devices(client, count):
    # Implementation
    pass

async def update_devices(client):
    # Implementation
    pass

async def query_devices(client):
    # Implementation
    pass

Queue-Based Processing

Use async queues for producer-consumer pattern:
import asyncio

async def producer(
    queue: asyncio.Queue,
    client: InfrahubClient
):
    """Produce device IDs."""
    devices = await client.all(kind="InfraDevice")
    
    for device in devices:
        await queue.put(device.id)
    
    # Signal completion
    await queue.put(None)

async def consumer(
    queue: asyncio.Queue,
    client: InfrahubClient,
    worker_id: int
):
    """Consume and process device IDs."""
    while True:
        device_id = await queue.get()
        
        if device_id is None:
            # Re-add sentinel for other workers
            await queue.put(None)
            break
        
        # Process device
        device = await client.get(kind="InfraDevice", id=device_id)
        print(f"Worker {worker_id}: {device.name.value}")
        
        queue.task_done()

async def queue_processing():
    client = InfrahubClient()
    queue = asyncio.Queue()
    
    # Start producer
    producer_task = asyncio.create_task(
        producer(queue, client)
    )
    
    # Start multiple consumers
    consumer_tasks = [
        asyncio.create_task(consumer(queue, client, i))
        for i in range(5)
    ]
    
    # Wait for completion
    await producer_task
    await asyncio.gather(*consumer_tasks)

Async Retry Logic

import asyncio
from functools import wraps

def async_retry(max_attempts: int = 3, delay: float = 1.0):
    """Decorator for async retry logic."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_attempts - 1:
                        await asyncio.sleep(delay * (2 ** attempt))
                        continue
                    else:
                        raise
        return wrapper
    return decorator

# Usage
@async_retry(max_attempts=3, delay=1.0)
async def fetch_device(client: InfrahubClient, device_id: str):
    return await client.get(kind="InfraDevice", id=device_id)

Timeout Management

import asyncio

async def fetch_with_timeout(
    client: InfrahubClient,
    device_id: str,
    timeout: float = 5.0
):
    """Fetch device with timeout."""
    try:
        device = await asyncio.wait_for(
            client.get(kind="InfraDevice", id=device_id),
            timeout=timeout
        )
        return device
    except asyncio.TimeoutError:
        print(f"Timeout fetching device {device_id}")
        return None

Synchronous Client

When to Use Sync Client

For synchronous contexts or legacy code:
from infrahub_sdk import InfrahubClientSync

# Synchronous client (no await needed)
client = InfrahubClientSync()

# Synchronous operations
device = client.get(kind="InfrahubDevice", id="device-id")
devices = client.all(kind="InfraDevice")

device.name.value = "updated-name"
device.save()

Mixed Async/Sync

Use sync client in async code when needed:
import asyncio
from infrahub_sdk import InfrahubClient, InfrahubClientSync

async def mixed_example():
    # Async client for most operations
    async_client = InfrahubClient()
    
    devices = await async_client.all(kind="InfraDevice")
    
    # Sync client for specific operations
    sync_client = InfrahubClientSync()
    
    # Note: This blocks the event loop - use sparingly
    tag = sync_client.get(kind="BuiltinTag", id="tag-id")

Performance Optimization

Leverage asyncio.gather() to run multiple operations concurrently.
Prevent overwhelming the API with too many concurrent requests.
Break large operations into manageable batches for better control.
Use return_exceptions=True or try/except to prevent one failure from stopping all operations.
Ensure proper cleanup of resources with async context managers.

Next Steps

Batch Operations

Combine async with batch operations

Error Handling

Handle async errors properly

Pagination

Use async patterns for pagination

Branches

Async branch operations

Build docs developers (and LLMs) love