Operations are the core building blocks of pyinfra. They define what should be done on target hosts, comparing desired state against current state and generating the necessary commands to achieve that state.
Every operation in pyinfra is idempotent by design. Running the same operation multiple times will only make changes when needed.
The @operation decorator (defined in src/pyinfra/api/operation.py) transforms a generator function into an operation that integrates with pyinfra’s two-phase execution model:
# From src/pyinfra/api/operation.py:240-261def operation( is_idempotent: bool = True, idempotent_notice: Optional[str] = None, is_deprecated: bool = False, deprecated_for: Optional[str] = None, _set_in_op: bool = True,) -> Callable[[Callable[P, Generator]], PyinfraOperation[P]]: """ Decorator that takes a simple module function and turn it into the internal operation representation that consists of a list of commands + options (sudo, (sudo|su)_user, env). """ def decorator(f: Callable[P, Generator]) -> PyinfraOperation[P]: f.is_idempotent = is_idempotent f.idempotent_notice = idempotent_notice f.is_deprecated = is_deprecated f.deprecated_for = deprecated_for return _wrap_operation(f, _set_in_op=_set_in_op) return decorator
Here’s an example from the built-in operations (src/pyinfra/operations/files.py):
# From src/pyinfra/operations/files.py:75-227@operation()def download( src: str, dest: str, user: str | None = None, group: str | None = None, mode: str | None = None, cache_time: int | None = None, force=False, sha256sum: str | None = None,): """ Download files from remote locations using curl or wget. + src: source URL of the file + dest: where to save the file + force: always download the file, even if it already exists + sha256sum: sha256 hash to checksum the downloaded file against """ info = host.get_fact(File, path=dest) # Destination is a directory? if info is False: raise OperationError( f"Destination {dest} already exists and is not a file", ) # Do we download the file? Force by default download = force # Doesn't exist, let's download it if info is None: download = True # File exists & cache_time: check when last modified else: if cache_time: ctime = host.get_fact(Date).replace(tzinfo=None) - timedelta(seconds=cache_time) if info["mtime"] and info["mtime"] < ctime: download = True if sha256sum: if sha256sum != host.get_fact(Sha256File, path=dest): download = True # Only download if needed if download: temp_file = host.get_temp_filename(dest) # Try curl first yield f"curl -sSLf {src} -o {temp_file}" # Move to final location yield f"mv {temp_file} {dest}" # Set ownership and permissions if specified if user or group: yield f"chown {user}:{group} {dest}" if mode: yield f"chmod {mode} {dest}"
This operation demonstrates idempotency by checking if the file exists, validating checksums, and only downloading when necessary.
During the Execute phase, the command generator is run again:
# From src/pyinfra/api/operations.py:88-189for command in op_data.command_generator(): commands.append(command) status = False if isinstance(command, FunctionCommand): # Execute Python function status = command.execute(state, host, connector_arguments) elif isinstance(command, StringCommand): # Execute shell command status, output_lines = command.execute(state, host, connector_arguments) else: # Execute file transfer or other command type status = command.execute(state, host, connector_arguments) # Break on failure if status is False: did_error = True break executed_commands += 1
Operations return an OperationMeta object that tracks status:
from pyinfra.operations import apt# Call operationop = apt.update()# Check if changes will be made (Prepare phase)if op.will_change: print("Operation will make changes")# After execution (Execute phase)if op.did_succeed(): print("Operation succeeded") if op.did_change(): print("Operation made changes")else: print("Operation failed")# Access outputprint(op.stdout)print(op.stderr)print(op.stdout_lines)# Check retry informationif op.was_retried: print(f"Operation was retried {op.retry_attempts} times") print(f"Retry succeeded: {op.retry_succeeded}")
Operations cannot call other operations directly. Use _inner to access the generator:
@operation()def custom_operation(): # WRONG: This will raise an error # apt.update() # CORRECT: Use _inner to access the generator function for command in apt.update._inner(): yield command
Nested operations are discouraged. Instead, create a deploy function that calls multiple operations.
@operation()def ensure_user(username: str): from pyinfra.facts.server import User user_info = host.get_fact(User, user=username) if not user_info: yield f"useradd {username}"
def check_service_ready(output): """Retry until service is ready.""" return "Ready" not in output["stdout_lines"]server.shell( name="Wait for service", commands=["curl http://localhost:8080/health"], _retries=10, _retry_delay=5, _retry_until=check_service_ready,)
# WRONG (not idempotent)@operation()def bad_user(username: str): yield f"useradd {username}" # Fails if user exists!# CORRECT@operation()def good_user(username: str): from pyinfra.facts.server import User if not host.get_fact(User, user=username): yield f"useradd {username}"