The State class manages the state of a pyinfra deployment, tracking hosts, operations, configuration, and execution progress.
Creating a State
Create a state object with an inventory and configuration:
from pyinfra import Inventory, Config, State
# Create inventory
inventory = Inventory((["host1", "host2"], {}))
# Create config
config = Config(SUDO=True, PARALLEL=5)
# Create and initialize state
state = State(inventory, config)
state.init(inventory, config)
Constructor
def __init__(
self,
inventory: Inventory = None,
config: Config = None,
check_for_changes: bool = True,
**kwargs
):
"""Initialize the state.
Args:
inventory: The inventory of hosts
config: Configuration object
check_for_changes: Whether to check for changes during prepare
"""
The inventory containing target hosts.
Configuration object. If None, uses defaults.
Whether to check if operations will make changes during the prepare phase.
Properties
The inventory containing all hosts.
The configuration object for this deployment.
Gevent pool for parallel execution.
Current execution stage (Setup, Connect, Prepare, Execute, Disconnect).
Whether operations are currently being executed.
Set of all hosts that have been activated.
Set of currently active hosts (excludes failed hosts).
Set of hosts that have failed.
op_meta
dict[str, StateOperationMeta]
Dictionary mapping operation hashes to operation metadata.
ops
dict[Host, dict[str, StateOperationHostData]]
Dictionary mapping hosts to their operation data.
State Stages
The state progresses through these stages:
from pyinfra.api.state import StateStage
# Stage enumeration
StateStage.Setup # 1 - Collect inventory & data
StateStage.Connect # 2 - Connect to inventory
StateStage.Prepare # 3 - Detect operation changes
StateStage.Execute # 4 - Execute operations
StateStage.Disconnect # 5 - Disconnect from inventory
Set Stage
from pyinfra.api.state import StateStage
state.set_stage(StateStage.Connect)
state.set_stage(StateStage.Execute)
Host Management
Activate Host
Mark a host as active:
host = inventory.get_host("web1")
state.activate_host(host)
Fail Hosts
Mark hosts as failed:
# Fail a single host
failed_hosts = {host}
state.fail_hosts(failed_hosts)
# Fail multiple hosts
failed_hosts = {host1, host2}
state.fail_hosts(failed_hosts)
This automatically:
- Removes hosts from
active_hosts
- Adds hosts to
failed_hosts
- Checks against
FAIL_PERCENT threshold
- Raises
PyinfraError if threshold exceeded
Check Host Limit
Check if a host is within the current limit:
if state.is_host_in_limit(host):
# Host is included in current operation
pass
Operation Management
Get Operation Order
Get the topologically sorted order of operations:
op_order = state.get_op_order()
for op_hash in op_order:
op_meta = state.get_op_meta(op_hash)
print(f"Operation: {op_meta.names}")
op_hash = "abc123"
op_meta = state.get_op_meta(op_hash)
print(f"Names: {op_meta.names}")
print(f"Order: {op_meta.op_order}")
print(f"Arguments: {op_meta.global_arguments}")
Get Operation Data for Host
op_data = state.get_op_data_for_host(host, op_hash)
# Access operation data
command_gen = op_data.command_generator
global_args = op_data.global_arguments
op_meta = op_data.operation_meta
Set Operation Data for Host
from pyinfra.api.state import StateOperationHostData
op_data = StateOperationHostData(
command_generator=lambda: iter(["echo test"]),
global_arguments={},
operation_meta=op_meta,
)
state.set_op_data_for_host(host, op_hash, op_data)
Results Tracking
meta = state.get_meta_for_host(host)
print(f"Total ops: {meta.ops}")
print(f"Ops with changes: {meta.ops_change}")
print(f"Ops without changes: {meta.ops_no_change}")
print(f"Op hashes: {meta.op_hashes}")
Get Host Results
results = state.get_results_for_host(host)
print(f"Total ops: {results.ops}")
print(f"Successful ops: {results.success_ops}")
print(f"Failed ops: {results.error_ops}")
print(f"Ignored errors: {results.ignored_error_ops}")
print(f"Partial ops: {results.partial_ops}")
Callbacks
Register callback handlers to respond to events:
from pyinfra.api.state import BaseStateCallback
class MyCallbacks(BaseStateCallback):
@staticmethod
def host_connect(state, host):
print(f"Connected to {host.name}")
@staticmethod
def operation_start(state, op_hash):
op_meta = state.get_op_meta(op_hash)
print(f"Starting: {op_meta.names}")
@staticmethod
def operation_host_success(state, host, op_hash, retry_count=0):
print(f"Success on {host.name}")
state.add_callback_handler(MyCallbacks())
Available Callbacks
Host Callbacks:
host_before_connect(state, host) - Before connecting
host_connect(state, host) - After successful connection
host_connect_error(state, host, error) - On connection error
host_disconnect(state, host) - After disconnection
Operation Callbacks:
operation_start(state, op_hash) - Operation starting
operation_host_start(state, host, op_hash) - Operation starting on host
operation_host_success(state, host, op_hash, retry_count) - Operation succeeded
operation_host_error(state, host, op_hash, retry_count, max_retries) - Operation failed
operation_host_retry(state, host, op_hash, retry_num, max_retries) - Operation retrying
operation_end(state, op_hash) - Operation completed
Trigger Callbacks
# Manually trigger callbacks
state.trigger_callbacks("host_connect", host)
state.trigger_callbacks("operation_start", op_hash)
Warning Counter
Track warnings per stage:
# Increment warning counter
state.increment_warning_counter()
# Get warning count for current stage
warning_count = state.get_warning_counter()
print(f"Warnings in current stage: {warning_count}")
Change Detection
# Check if change detection is enabled
if state.should_check_for_changes():
print("Will check for changes during prepare phase")
Complete Example
Here’s a complete example using the State API:
from pyinfra import Inventory, Config, State
from pyinfra.api import add_op
from pyinfra.api.operations import run_ops
from pyinfra.api.state import BaseStateCallback, StateStage
from pyinfra.operations import server, apt
import click
# Custom callback handler
class DeployCallbacks(BaseStateCallback):
@staticmethod
def host_connect(state, host):
click.echo(f"✓ Connected to {host.name}", err=True)
@staticmethod
def operation_host_success(state, host, op_hash, retry_count=0):
op_meta = state.get_op_meta(op_hash)
retry_info = f" (retry {retry_count})" if retry_count > 0 else ""
click.echo(f" ✓ {host.name}: {list(op_meta.names)[0]}{retry_info}", err=True)
@staticmethod
def operation_host_error(state, host, op_hash, retry_count=0, max_retries=0):
op_meta = state.get_op_meta(op_hash)
click.echo(f" ✗ {host.name}: {list(op_meta.names)[0]} FAILED", err=True)
# Create inventory
inventory = Inventory(
([
"web1.example.com",
"web2.example.com",
], {
"ssh_user": "ubuntu",
})
)
# Create config
config = Config(
SUDO=True,
PARALLEL=2,
FAIL_PERCENT=50, # Allow 50% failure
)
# Initialize state
state = State(inventory, config)
state.init(inventory, config)
# Add callback handler
state.add_callback_handler(DeployCallbacks())
# Connect to hosts
state.set_stage(StateStage.Connect)
for host in inventory:
host.connect()
state.activate_host(host)
click.echo(f"\nConnected to {len(state.active_hosts)} hosts", err=True)
# Prepare operations
state.set_stage(StateStage.Prepare)
add_op(
state,
server.shell,
name="Check system",
commands=["uname -a"],
)
add_op(
state,
apt.packages,
name="Install packages",
packages=["curl", "wget", "vim"],
update=True,
)
# Get operation order
op_order = state.get_op_order()
click.echo(f"\nPrepared {len(op_order)} operations", err=True)
# Execute operations
state.set_stage(StateStage.Execute)
click.echo("\nExecuting operations...", err=True)
run_ops(state)
# Print results
click.echo("\nResults:", err=True)
for host in inventory:
results = state.get_results_for_host(host)
meta = state.get_meta_for_host(host)
status = "✓" if host in state.active_hosts else "✗"
click.echo(f"{status} {host.name}:", err=True)
click.echo(f" Operations: {results.success_ops}/{meta.ops}", err=True)
click.echo(f" Changes: {meta.ops_change}", err=True)
if results.error_ops > 0:
click.echo(f" Errors: {results.error_ops}", err=True)
# Disconnect
state.set_stage(StateStage.Disconnect)
for host in inventory:
host.disconnect()
click.echo("\n✓ Deploy complete!", err=True)
Source Reference
Location: src/pyinfra/api/state.py:145
Key Classes
State - Main state class (line 145)
StateStage - Execution stages enum (line 93)
StateOperationMeta - Operation metadata (line 106)
StateOperationHostData - Per-host operation data (line 119)
StateHostMeta - Per-host metadata (line 127)
StateHostResults - Per-host results (line 137)
BaseStateCallback - Callback base class (line 41)
Key Methods
__init__() - Initialize state (line 187)
init() - Complete initialization (line 206)
set_stage() - Set execution stage (line 284)
activate_host() - Activate host (line 370)
fail_hosts() - Mark hosts as failed (line 382)
get_op_order() - Get operation order (line 310)
add_callback_handler() - Add callbacks (line 298)