Skip to main content
Infrahub uses Prefect as its workflow orchestration engine, enabling sophisticated CI/CD pipelines, scheduled tasks, and event-driven automation.

Workflow Architecture

Infrahub workflows are Prefect flows defined in the backend:
# backend/infrahub/git/tasks.py
from prefect import flow, task

@flow(name="git-repository-add-read-write")
async def add_git_repository(model: GitRepositoryAdd) -> None:
    # Workflow logic
    ...

@task(name="git-branch-create", cache_policy=NONE)
async def git_branch_create(...) -> None:
    # Task logic
    ...

Workflow Types

Workflows are categorized by type:
# backend/infrahub/workflows/constants.py
class WorkflowType(StrEnum):
    CORE = "core"        # Core system workflows
    USER = "user"        # User-triggered workflows
    INTERNAL = "internal" # Background/scheduled workflows

Workflow Catalogue

All workflows are registered in the catalogue:
# backend/infrahub/workflows/catalogue.py
GIT_REPOSITORY_ADD = WorkflowDefinition(
    name="git-repository-add-read-write",
    type=WorkflowType.CORE,
    module="infrahub.git.tasks",
    function="add_git_repository",
    tags=[WorkflowTag.DATABASE_CHANGE],
)

GIT_REPOSITORIES_SYNC = WorkflowDefinition(
    name="git_repositories_sync",
    type=WorkflowType.INTERNAL,
    cron="* * * * *",  # Every minute
    module="infrahub.git.tasks",
    function="sync_remote_repositories",
    concurrency_limit=1,
)

Key Workflow Definitions

Git Operations

  • GIT_REPOSITORY_ADD: Add and clone repository
  • GIT_REPOSITORIES_SYNC: Sync all repositories (scheduled)
  • GIT_REPOSITORIES_CREATE_BRANCH: Create branch across repos
  • GIT_REPOSITORIES_MERGE: Merge Git branches
  • GIT_REPOSITORY_USER_CHECKS_TRIGGER: Run user-defined checks

Artifact Generation

  • REQUEST_ARTIFACT_DEFINITION_GENERATE: Trigger artifact generation
  • REQUEST_ARTIFACT_GENERATE: Generate single artifact
  • TRIGGER_ARTIFACT_DEFINITION_GENERATE: Batch artifact generation

Branch Operations

  • BRANCH_CREATE: Create new branch
  • BRANCH_MERGE: Merge branch
  • BRANCH_REBASE: Rebase branch
  • BRANCH_DELETE: Delete branch
  • BRANCH_VALIDATE: Validate branch state

Proposed Changes

  • PROPOSED_CHANGE_MERGE: Merge proposed change
  • REQUEST_PROPOSED_CHANGE_PIPELINE: Run full validation pipeline
  • REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY: Data integrity checks
  • REQUEST_PROPOSED_CHANGE_SCHEMA_INTEGRITY: Schema integrity checks

Creating Workflows

Define a Flow

from prefect import flow
from infrahub.workflows.utils import add_tags

@flow(
    name="custom-validation-workflow",
    flow_run_name="Validate {object_kind} objects"
)
async def validate_objects(
    object_kind: str,
    branch_name: str,
) -> None:
    # Add branch tag for filtering
    await add_tags(branches=[branch_name])
    
    # Your workflow logic
    client = get_client()
    objects = await client.filters(kind=object_kind, branch=branch_name)
    
    for obj in objects:
        # Validate each object
        await validate_single_object(obj)

Define Tasks

from prefect import task
from prefect.cache_policies import NONE

@task(
    name="validate-single-object",
    task_run_name="Validate {obj.display_label}",
    cache_policy=NONE,
    retries=3,
)
async def validate_single_object(obj) -> bool:
    # Task logic
    if not obj.name.value:
        return False
    return True

Register Workflow

# backend/infrahub/workflows/catalogue.py
CUSTOM_VALIDATION = WorkflowDefinition(
    name="custom-validation-workflow",
    type=WorkflowType.USER,
    module="myapp.workflows.validation",
    function="validate_objects",
    tags=[WorkflowTag.DATABASE_CHANGE],
)

# Add to WORKFLOWS list
WORKFLOWS = [
    # ... existing workflows
    CUSTOM_VALIDATION,
]

Submitting Workflows

Via Workflow Service

from infrahub.workers.dependencies import get_workflow

workflow_service = get_workflow()

# Submit workflow (fire and forget)
await workflow_service.submit_workflow(
    workflow=CUSTOM_VALIDATION,
    context=context,
    parameters={
        "object_kind": "NetworkDevice",
        "branch_name": "main",
    },
)

Execute and Wait

# Execute and wait for result
result = await workflow_service.execute_workflow(
    workflow=CUSTOM_VALIDATION,
    parameters={
        "object_kind": "NetworkDevice",
        "branch_name": "main",
    },
    expected_return=bool,
)

Batch Execution

client = get_client()
batch = await client.create_batch()

for device in devices:
    batch.add(
        task=validate_device,
        device_id=device.id,
        branch=branch_name,
    )

async for result, error in batch.execute():
    if error:
        log.error(f"Validation failed: {error}")
    else:
        log.info(f"Validation passed: {result}")

Workflow Composition

Parent-Child Flows

@flow(name="parent-workflow")
async def parent_workflow(branch: str) -> None:
    # Parent flow can spawn child flows
    await child_workflow_1(branch=branch)
    await child_workflow_2(branch=branch)

@flow(name="child-workflow-1")
async def child_workflow_1(branch: str) -> None:
    # Child flow logic
    ...

Task Dependencies

@flow(name="sequential-workflow")
async def sequential_workflow() -> None:
    # Tasks run sequentially
    result1 = await task1()
    result2 = await task2(result1)
    await task3(result2)

@flow(name="parallel-workflow")
async def parallel_workflow() -> None:
    # Tasks can run in parallel
    batch = await client.create_batch()
    batch.add(task=task1)
    batch.add(task=task2)
    batch.add(task=task3)
    
    async for result, error in batch.execute():
        # Process results
        ...

Scheduled Workflows

Cron Schedule

GIT_REPOSITORIES_SYNC = WorkflowDefinition(
    name="git_repositories_sync",
    type=WorkflowType.INTERNAL,
    cron="* * * * *",  # Every minute
    module="infrahub.git.tasks",
    function="sync_remote_repositories",
)

ANONYMOUS_TELEMETRY_SEND = WorkflowDefinition(
    name="anonymous_telemetry_send",
    type=WorkflowType.INTERNAL,
    cron=f"{random.randint(0, 59)} 2 * * *",  # Daily at 2:XX AM
    module="infrahub.telemetry.tasks",
    function="send_telemetry_push",
)

Scheduled Examples

  • Repository Sync: Every minute (* * * * *)
  • Telemetry: Daily at random time
  • Webhook Config: Daily at 3:XX AM
  • Lock Cleanup: Every minute

Concurrency Control

Limit Concurrent Runs

GIT_REPOSITORIES_SYNC = WorkflowDefinition(
    name="git_repositories_sync",
    type=WorkflowType.INTERNAL,
    cron="* * * * *",
    module="infrahub.git.tasks",
    function="sync_remote_repositories",
    concurrency_limit=1,  # Only one run at a time
    concurrency_limit_strategy=ConcurrencyLimitStrategy.CANCEL_NEW,
)

Lock Resources

from infrahub import lock

@flow(name="modify-repository")
async def modify_repository(repo_name: str) -> None:
    # Acquire lock for repository
    async with lock.registry.get(name=repo_name, namespace="repository"):
        # Only one workflow can modify this repo at a time
        await perform_repository_operation(repo_name)

Error Handling

Task Retries

@task(
    name="unreliable-task",
    retries=3,  # Retry up to 3 times
    retry_delay_seconds=10,  # Wait 10s between retries
)
async def unreliable_task() -> None:
    # Task that might fail
    ...

Exception Handling

@flow(name="resilient-workflow")
async def resilient_workflow() -> None:
    try:
        await risky_operation()
    except RepositoryError as exc:
        log.error(f"Repository error: {exc.message}")
        # Handle or re-raise
        raise
    except Exception as exc:
        log.exception("Unexpected error")
        # Cleanup
        await cleanup()
        raise

Workflow Tags

Tags help categorize and filter workflows:
# backend/infrahub/workflows/constants.py
class WorkflowTag(StrEnum):
    DATABASE_CHANGE = "database-change"
    WORKFLOWTYPE = "workflow-type"

WORKFLOW_WITH_TAGS = WorkflowDefinition(
    name="important-workflow",
    type=WorkflowType.CORE,
    tags=[WorkflowTag.DATABASE_CHANGE],
    module="...",
    function="...",
)

Branch Tags

Tag workflow runs with branches:
from infrahub.workflows.utils import add_tags, add_branch_tag

@flow(name="branch-specific-workflow")
async def branch_workflow(branch: str) -> None:
    # Tag with branch
    await add_branch_tag(branch_name=branch)
    
    # Or tag with multiple resources
    await add_tags(
        branches=[branch],
        nodes=["node-id-1", "node-id-2"],
    )

Workflow Models

WorkflowDefinition

# backend/infrahub/workflows/models.py
class WorkflowDefinition(BaseModel):
    name: str
    type: WorkflowType = WorkflowType.INTERNAL
    module: str  # Python module path
    function: str  # Function name
    cron: str | None = None  # Optional schedule
    tags: list[WorkflowTag] = Field(default_factory=list)
    concurrency_limit: int | None = None
    concurrency_limit_strategy: ConcurrencyLimitStrategy | None = None

WorkflowInfo

class WorkflowInfo(BaseModel):
    id: UUID  # Prefect flow run ID
    info: FlowRun | None = None  # Prefect flow run info

Real-World Examples

Proposed Change Pipeline

# backend/infrahub/proposed_change/tasks.py
@flow(name="proposed-changed-pipeline")
async def run_proposed_change_pipeline(
    proposed_change_id: str,
    source_branch: str,
    destination_branch: str,
) -> None:
    """Complete validation pipeline for proposed changes."""
    
    # Run data integrity checks
    await workflow.submit_workflow(
        workflow=REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
        parameters={"proposed_change_id": proposed_change_id},
    )
    
    # Run schema integrity checks
    await workflow.submit_workflow(
        workflow=REQUEST_PROPOSED_CHANGE_SCHEMA_INTEGRITY,
        parameters={"proposed_change_id": proposed_change_id},
    )
    
    # Run repository checks
    await workflow.submit_workflow(
        workflow=REQUEST_PROPOSED_CHANGE_REPOSITORY_CHECKS,
        parameters={
            "proposed_change_id": proposed_change_id,
            "source_branch": source_branch,
            "destination_branch": destination_branch,
        },
    )
    
    # Refresh artifacts
    await workflow.submit_workflow(
        workflow=REQUEST_PROPOSED_CHANGE_REFRESH_ARTIFACTS,
        parameters={"proposed_change_id": proposed_change_id},
    )

Artifact Generation

# backend/infrahub/git/tasks.py
@flow(name="request_artifact_definitions_generate")
async def generate_request_artifact_definition(
    model: RequestArtifactDefinitionGenerate,
    context: InfrahubContext,
) -> None:
    """Generate all artifacts for a definition."""
    
    client = get_client()
    artifact_definition = await client.get(
        kind=CoreArtifactDefinition,
        id=model.artifact_definition_id,
        branch=model.branch,
    )
    
    # Get targets from group
    group = await fetch_artifact_definition_targets(...)
    
    # Submit generation task for each target
    batch = await client.create_batch()
    for member in group.members.peers:
        batch.add(
            task=workflow.submit_workflow,
            workflow=REQUEST_ARTIFACT_GENERATE,
            parameters={"model": create_artifact_model(member)},
        )
    
    async for _, _ in batch.execute():
        pass

Best Practices

Design Principles

  1. Single Responsibility: Each workflow should have one clear purpose
  2. Idempotent: Workflows should be safe to retry
  3. Composable: Build complex workflows from simple tasks
  4. Observable: Add logging and tags for monitoring

Performance

  1. Use batch operations for parallel execution
  2. Set appropriate concurrency limits
  3. Cache task results when appropriate
  4. Use task retries for transient failures

Monitoring

  1. Use descriptive flow and task run names
  2. Add tags for filtering in Prefect UI
  3. Log important state changes
  4. Track execution times for bottlenecks

Troubleshooting

Workflow Not Running

  1. Check workflow is registered in catalogue
  2. Verify worker pool is running
  3. Check Prefect server connectivity
  4. Review workflow deployment status

Task Failures

  1. Review task logs in Prefect UI
  2. Check error messages and stack traces
  3. Verify input parameters
  4. Test task function independently

Performance Issues

  1. Profile workflow execution times
  2. Identify slow tasks
  3. Parallelize independent operations
  4. Optimize database queries

Build docs developers (and LLMs) love