pyinfra is built around a unique two-phase execution model that separates operation discovery from execution. This architecture enables predictable, idempotent infrastructure management with efficient parallel execution.
The two-phase model is fundamental to how pyinfra works. Understanding this concept is key to mastering the framework.
Discovers operations: Executes your deploy scripts to discover all operations
Collects facts: Gathers current state from target hosts
Determines changes: Compares desired state against current state
Builds operation DAG: Creates a dependency graph of operations
Generates command list: Produces the exact commands needed
from pyinfra.api.state import StateStage# From src/pyinfra/api/state.py:93-104class StateStage(IntEnum): # Setup - collect inventory & data Setup = 1 # Connect - connect to the inventory Connect = 2 # Prepare - detect operation changes Prepare = 3 # Execute - execute operations Execute = 4 # Disconnect - disconnect from the inventory Disconnect = 5
During this phase, operations are called but not executed. The @operation decorator intercepts the call and returns an OperationMeta object instead:
# From src/pyinfra/api/operation.py:264-380@wraps(func)def decorated_func(*args, **kwargs) -> OperationMeta: state = context.state host = context.host # Check we're in the correct stage if pyinfra.is_cli and ( state.current_stage < StateStage.Prepare or state.current_stage > StateStage.Execute ): raise Exception("Cannot call operations outside of Prepare/Execute stages") # Generate operation metadata names, add_args = generate_operation_name(func, host, kwargs, global_arguments) op_order, op_hash = solve_operation_consistency(names, state, host) # Create a generator that will yield commands later def command_generator() -> Iterator[PyinfraCommand]: # ... operation logic runs here during execution for command in func(*args, **kwargs): if isinstance(command, str): command = StringCommand(command.strip()) yield command # Determine if this operation will make changes op_is_change = None if state.should_check_for_changes(): op_is_change = False for _ in command_generator(): # Iterate once to check op_is_change = True break return OperationMeta(op_hash, op_is_change)
The Prepare phase is what enables pyinfra’s “dry-run” mode (pyinfra ... --dry). Since operations are discovered but not executed, you can see exactly what would change.
The State class (defined in src/pyinfra/api/state.py) is the central coordinator for a pyinfra deployment:
# From src/pyinfra/api/state.py:145-283class State: """ Manages state for a pyinfra deploy. """ # A pyinfra.api.Inventory which stores all our pyinfra.api.Host's inventory: "Inventory" # A pyinfra.api.Config config: "Config" # Main gevent pool for parallel execution pool: "Pool" # Current stage this state is in current_stage: StateStage = StateStage.Setup # Whether we are executing operations (ie hosts are all ready) is_executing: bool = False # Whether we should check for operation changes check_for_changes: bool = True # Op basics op_meta: dict[str, StateOperationMeta] = {} # operation hash -> metadata # Op dict for each host ops: dict[Host, dict[str, StateOperationHostData]] = {} # Meta dict for each host meta: dict[Host, StateHostMeta] = {} # Results dict for each host results: dict[Host, StateHostResults] = {}
Operations are ordered using a Directed Acyclic Graph (DAG) to handle dependencies:
# From src/pyinfra/api/state.py:310-344def get_op_order(self): ts: TopologicalSorter = TopologicalSorter() # Build the DAG from each host's operation order for host in self.inventory: for i, op_hash in enumerate(host.op_hash_order): if not i: ts.add(op_hash) # First operation has no dependencies else: ts.add(op_hash, host.op_hash_order[i - 1]) # Depends on previous final_op_order = [] try: ts.prepare() except CycleError as e: raise PyinfraError( "Cycle detected in operation ordering DAG.\n" f" Error: {e}\n\n" " This can happen when using loops in operation code" ) while ts.is_active(): # Sort operations that can run in parallel by line number node_group = sorted( ts.get_ready(), key=lambda op_hash: self.op_meta[op_hash].op_order, ) ts.done(*node_group) final_op_order.extend(node_group) return final_op_order
The DAG ensures operations run in the correct order while maximizing parallelism. Operations with no dependencies can run simultaneously across different hosts.
pyinfra uses context variables to track the current host and state:
# From src/pyinfra/context.pyfrom pyinfra.context import ctx_host, ctx_state# Access current hostcurrent_host = ctx_host.get()# Access current statecurrent_state = ctx_state.get()# Set context for executionwith ctx_state.use(state): with ctx_host.use(host): # Operations run here with this context operation(*args, **kwargs)
This allows operations and facts to access the current execution context without explicit parameter passing.
from pyinfra.api import FunctionCommanddef my_function(state, host, *args): # Python code runs here on the control machine return Truecommand = FunctionCommand(my_function, args)