Skip to main content

Overview

Operations are the core building blocks of pyinfra. They define what should be done on target hosts, comparing desired state against current state and generating the necessary commands to achieve that state.
Every operation in pyinfra is idempotent by design. Running the same operation multiple times will only make changes when needed.

The @operation Decorator

The @operation decorator (defined in src/pyinfra/api/operation.py) transforms a generator function into an operation that integrates with pyinfra’s two-phase execution model:
# From src/pyinfra/api/operation.py:240-261
def operation(
    is_idempotent: bool = True,
    idempotent_notice: Optional[str] = None,
    is_deprecated: bool = False,
    deprecated_for: Optional[str] = None,
    _set_in_op: bool = True,
) -> Callable[[Callable[P, Generator]], PyinfraOperation[P]]:
    """
    Decorator that takes a simple module function and turn it into the internal
    operation representation that consists of a list of commands + options
    (sudo, (sudo|su)_user, env).
    """

    def decorator(f: Callable[P, Generator]) -> PyinfraOperation[P]:
        f.is_idempotent = is_idempotent
        f.idempotent_notice = idempotent_notice
        f.is_deprecated = is_deprecated
        f.deprecated_for = deprecated_for
        return _wrap_operation(f, _set_in_op=_set_in_op)

    return decorator

Decorator Parameters

  • is_idempotent (default: True): Whether the operation can be safely re-run
  • idempotent_notice: Custom message about idempotency behavior
  • is_deprecated: Mark operation as deprecated
  • deprecated_for: Suggest alternative operation
  • _set_in_op: Internal flag for operation context tracking

Creating an Operation

Basic Structure

An operation is a generator function that yields shell commands or command objects:
from pyinfra.api import operation

@operation()
def hello_world():
    """
    Says hello to the world.
    """
    yield "echo 'Hello, World!'"

Using Facts for Idempotency

Operations should check current state using facts before yielding commands:
from pyinfra import host
from pyinfra.api import operation
from pyinfra.facts.files import File

@operation()
def ensure_file(path: str, content: str):
    """
    Ensures a file exists with specific content.
    """
    # Check current state
    file_info = host.get_fact(File, path=path)
    
    if file_info is None:
        # File doesn't exist, create it
        yield f"echo '{content}' > {path}"
    else:
        # File exists, check if content needs updating
        from pyinfra.facts.files import FileContents
        current_content = host.get_fact(FileContents, path=path)
        
        if current_content != content:
            yield f"echo '{content}' > {path}"

Real-World Example

Here’s an example from the built-in operations (src/pyinfra/operations/files.py):
# From src/pyinfra/operations/files.py:75-227
@operation()
def download(
    src: str,
    dest: str,
    user: str | None = None,
    group: str | None = None,
    mode: str | None = None,
    cache_time: int | None = None,
    force=False,
    sha256sum: str | None = None,
):
    """
    Download files from remote locations using curl or wget.

    + src: source URL of the file
    + dest: where to save the file
    + force: always download the file, even if it already exists
    + sha256sum: sha256 hash to checksum the downloaded file against
    """

    info = host.get_fact(File, path=dest)

    # Destination is a directory?
    if info is False:
        raise OperationError(
            f"Destination {dest} already exists and is not a file",
        )

    # Do we download the file? Force by default
    download = force

    # Doesn't exist, let's download it
    if info is None:
        download = True

    # File exists & cache_time: check when last modified
    else:
        if cache_time:
            ctime = host.get_fact(Date).replace(tzinfo=None) - timedelta(seconds=cache_time)
            if info["mtime"] and info["mtime"] < ctime:
                download = True

        if sha256sum:
            if sha256sum != host.get_fact(Sha256File, path=dest):
                download = True

    # Only download if needed
    if download:
        temp_file = host.get_temp_filename(dest)
        
        # Try curl first
        yield f"curl -sSLf {src} -o {temp_file}"
        # Move to final location
        yield f"mv {temp_file} {dest}"
        
        # Set ownership and permissions if specified
        if user or group:
            yield f"chown {user}:{group} {dest}"
        if mode:
            yield f"chmod {mode} {dest}"
This operation demonstrates idempotency by checking if the file exists, validating checksums, and only downloading when necessary.

Operation Lifecycle

1. Call Time (Prepare Phase)

When an operation is called during the Prepare phase:
# From src/pyinfra/api/operation.py:264-380
@wraps(func)
def decorated_func(*args, **kwargs) -> OperationMeta:
    state = context.state
    host = context.host

    # Verify we're not calling ops within ops
    if host.in_op:
        raise Exception(
            "Operation called within another operation, this is not allowed! "
            "Use the `_inner` function to call the underlying operation."
        )

    # Generate operation metadata
    global_arguments, global_argument_keys = pop_global_arguments(state, host, kwargs)
    names, add_args = generate_operation_name(func, host, kwargs, global_arguments)
    op_order, op_hash = solve_operation_consistency(names, state, host)

    # Create shared operation meta
    op_meta = ensure_shared_op_meta(state, op_hash, op_order, global_arguments, names)

    # Create command generator (not executed yet!)
    def command_generator() -> Iterator[PyinfraCommand]:
        host.in_op = _set_in_op
        host.current_op_hash = op_hash
        host.current_op_global_arguments = global_arguments

        try:
            for command in func(*args, **kwargs):
                if isinstance(command, str):
                    command = StringCommand(command.strip())
                yield command
        finally:
            host.in_op = False
            host.current_op_hash = None
            host.current_op_global_arguments = None

    # Check for changes if in diff mode
    op_is_change = None
    if state.should_check_for_changes():
        op_is_change = False
        for _ in command_generator():  # Run once to see if changes
            op_is_change = True
            break

    # Store operation data in state
    operation_meta = OperationMeta(op_hash, op_is_change)
    op_data = StateOperationHostData(command_generator, global_arguments, operation_meta)
    state.set_op_data_for_host(host, op_hash, op_data)

    return operation_meta

2. Execution Time (Execute Phase)

During the Execute phase, the command generator is run again:
# From src/pyinfra/api/operations.py:88-189
for command in op_data.command_generator():
    commands.append(command)
    status = False
    
    if isinstance(command, FunctionCommand):
        # Execute Python function
        status = command.execute(state, host, connector_arguments)
    
    elif isinstance(command, StringCommand):
        # Execute shell command
        status, output_lines = command.execute(state, host, connector_arguments)
    
    else:
        # Execute file transfer or other command type
        status = command.execute(state, host, connector_arguments)
    
    # Break on failure
    if status is False:
        did_error = True
        break
    
    executed_commands += 1

3. Completion

After execution, the operation is marked complete:
# From src/pyinfra/api/operation.py:82-101
def set_complete(
    self,
    success: bool,
    commands: list[Any],
    combined_output: CommandOutput,
    retry_attempts: int = 0,
    max_retries: int = 0,
) -> None:
    if self.is_complete():
        raise RuntimeError("Cannot complete an already complete operation")
    
    self._success = success
    self._commands = commands
    self._combined_output = combined_output
    self._retry_attempts = retry_attempts
    self._max_retries = max_retries

    # Determine if operation succeeded after retries
    if retry_attempts > 0:
        self._retry_succeeded = success

Command Types

Operations can yield different types of commands:

String Commands

The simplest form - shell commands as strings:
@operation()
def simple_example():
    yield "apt-get update"
    yield "apt-get install -y nginx"

StringCommand Objects

For more control over command construction:
from pyinfra.api import StringCommand, QuoteString

@operation()
def advanced_example(filename: str):
    # Properly quote arguments
    yield StringCommand("touch", QuoteString(filename))
    
    # Build complex commands
    yield StringCommand(
        "curl",
        "-sSLf",
        "https://example.com/file",
        "-o",
        QuoteString(filename)
    )

Function Commands

Execute Python code on the control machine:
from pyinfra.api import FunctionCommand

@operation()
def python_example():
    def my_function(state, host, message):
        print(f"[{host.name}] {message}")
        return True
    
    yield FunctionCommand(my_function, ("Hello from Python!",))

File Transfer Commands

Upload or download files:
from pyinfra.api import FileUploadCommand, FileDownloadCommand

@operation()
def file_example():
    # Upload a file
    yield FileUploadCommand(
        src="/local/config.txt",
        dest="/etc/myapp/config.txt",
        mode="644",
    )
    
    # Download a file
    yield FileDownloadCommand(
        src="/var/log/app.log",
        dest="./logs/app.log",
    )

Global Arguments

All operations accept global arguments that control execution behavior:
from pyinfra.operations import apt

# Basic global arguments
apt.update(
    name="Update apt cache",
    _sudo=True,
    _sudo_user="root",
    _ignore_errors=False,
    _continue_on_error=False,
)

# Conditional execution
apt.packages(
    name="Install nginx",
    packages=["nginx"],
    _if=lambda: host.get_fact("linux_distribution") == "Ubuntu",
)

# Retries
apt.packages(
    name="Install with retries",
    packages=["postgresql"],
    _retries=3,
    _retry_delay=10,
)

# Environment variables
server.shell(
    name="Build application",
    commands=["make build"],
    _env={"NODE_ENV": "production"},
)

# Run once across all hosts
server.script(
    name="Initialize database",
    src="scripts/init_db.sh",
    _run_once=True,
)

Available Global Arguments

From src/pyinfra/api/arguments.py:
ArgumentTypeDescription
namestrCustom operation name
_sudoboolExecute with sudo
_sudo_userstrUser for sudo
_su_userstrUser for su
_envdictEnvironment variables
_shell_executablestrShell to use
_timeoutintCommand timeout (seconds)
_ignore_errorsboolDon’t fail on errors
_continue_on_errorboolContinue after errors
_ifCallableConditional execution
_serialboolExecute serially across hosts
_run_onceboolRun once across all hosts
_retriesintNumber of retry attempts
_retry_delayintDelay between retries
_retry_untilCallableRetry condition function

Operation Return Value

Operations return an OperationMeta object that tracks status:
from pyinfra.operations import apt

# Call operation
op = apt.update()

# Check if changes will be made (Prepare phase)
if op.will_change:
    print("Operation will make changes")

# After execution (Execute phase)
if op.did_succeed():
    print("Operation succeeded")
    if op.did_change():
        print("Operation made changes")
else:
    print("Operation failed")

# Access output
print(op.stdout)
print(op.stderr)
print(op.stdout_lines)

# Check retry information
if op.was_retried:
    print(f"Operation was retried {op.retry_attempts} times")
    print(f"Retry succeeded: {op.retry_succeeded}")

OperationMeta Properties

From src/pyinfra/api/operation.py:43-204:
class OperationMeta:
    # Status checks
    @property
    def will_change(self) -> bool:
        """Will this operation make changes? (Prepare phase)"""
    
    def did_change(self) -> bool:
        """Did this operation make changes? (Execute phase)"""
    
    def did_not_change(self) -> bool:
        """Did this operation NOT make changes?"""
    
    def did_succeed(self) -> bool:
        """Did this operation succeed?"""
    
    def did_error(self) -> bool:
        """Did this operation error?"""
    
    def is_complete(self) -> bool:
        """Has this operation completed execution?"""
    
    # Output access
    @property
    def stdout_lines(self) -> list[str]:
        """List of stdout lines"""
    
    @property
    def stderr_lines(self) -> list[str]:
        """List of stderr lines"""
    
    @property
    def stdout(self) -> str:
        """Full stdout as string"""
    
    @property
    def stderr(self) -> str:
        """Full stderr as string"""
    
    # Retry information
    @property
    def retry_attempts(self) -> int:
        """Number of retry attempts made"""
    
    @property
    def was_retried(self) -> bool:
        """Was this operation retried?"""
    
    @property
    def retry_succeeded(self) -> Optional[bool]:
        """Did retries succeed?"""

Calling Operations

In Deploy Scripts

The normal way to call operations:
# deploy.py
from pyinfra.operations import apt, server

# Called during Prepare phase, executed during Execute phase
apt.update(
    name="Update package cache",
    _sudo=True,
)

apt.packages(
    name="Install nginx",
    packages=["nginx"],
    _sudo=True,
)

server.service(
    name="Start nginx",
    service="nginx",
    running=True,
    _sudo=True,
)

In the API

Using add_op for programmatic operation calls:
# From src/pyinfra/api/operation.py:206-234
from pyinfra.api import State, Inventory
from pyinfra.api.operation import add_op
from pyinfra.operations import apt

state = State()
state.init(inventory, config)

# Add operation to state
add_op(
    state,
    apt.update,
    _sudo=True,
)

# Execute operations
from pyinfra.api.operations import run_ops
run_ops(state)

Nested Operations

Operations cannot call other operations directly. Use _inner to access the generator:
@operation()
def custom_operation():
    # WRONG: This will raise an error
    # apt.update()
    
    # CORRECT: Use _inner to access the generator function
    for command in apt.update._inner():
        yield command
Nested operations are discouraged. Instead, create a deploy function that calls multiple operations.

Idempotency Patterns

Check-Then-Act

The most common pattern:
@operation()
def ensure_user(username: str):
    from pyinfra.facts.server import User
    
    user_info = host.get_fact(User, user=username)
    
    if not user_info:
        yield f"useradd {username}"

State Comparison

Compare current state to desired state:
@operation()
def ensure_file_mode(path: str, mode: str):
    from pyinfra.facts.files import File
    
    file_info = host.get_fact(File, path=path)
    
    if file_info and file_info["mode"] != int(mode, 8):
        yield f"chmod {mode} {path}"

Conditional Creation

Use shell conditionals for atomic operations:
@operation()
def touch_if_missing(path: str):
    # Only touch if file doesn't exist
    yield f"test -f {path} || touch {path}"

Error Handling

Ignore Errors

server.shell(
    name="Try to stop service",
    commands=["systemctl stop myservice"],
    _ignore_errors=True,
)

Continue on Error

server.shell(
    name="Run multiple commands",
    commands=[
        "command1",
        "command2",  # If this fails, continue to command3
        "command3",
    ],
    _continue_on_error=True,
)

Conditional Retries

def check_service_ready(output):
    """Retry until service is ready."""
    return "Ready" not in output["stdout_lines"]

server.shell(
    name="Wait for service",
    commands=["curl http://localhost:8080/health"],
    _retries=10,
    _retry_delay=5,
    _retry_until=check_service_ready,
)

Testing Operations

Unit Testing

Test operation logic without execution:
import pytest
from pyinfra import host
from pyinfra.api import State, Inventory, Config
from pyinfra.context import ctx_state, ctx_host
from my_operations import ensure_user

def test_ensure_user_creates_user():
    state = State()
    inventory = Inventory(([], {}))
    state.init(inventory, Config())
    
    test_host = list(inventory)[0]
    
    with ctx_state.use(state):
        with ctx_host.use(test_host):
            # Mock fact to show user doesn't exist
            test_host.get_fact = lambda *args, **kwargs: None
            
            # Call operation
            op = ensure_user("testuser")
            
            # Verify operation will make changes
            assert op.will_change

Integration Testing

Test operations against real hosts:
from pyinfra import host
from pyinfra.operations import server
from pyinfra.facts.server import User

def test_user_creation():
    # Create user
    server.user(
        name="Create test user",
        user="testuser",
        _sudo=True,
    )
    
    # Verify user exists
    user_info = host.get_fact(User, user="testuser")
    assert user_info is not None
    
    # Cleanup
    server.user(
        name="Remove test user",
        user="testuser",
        present=False,
        _sudo=True,
    )

Best Practices

Use Facts

Always check current state with facts before yielding commands. This ensures idempotency.

Descriptive Names

Use the name argument to provide clear operation descriptions for logging.

Handle Errors

Use _ignore_errors, _continue_on_error, or _retries for robust operations.

Quote Arguments

Use QuoteString for arguments that might contain spaces or special characters.

Type Hints

Add type hints to operation parameters for better IDE support and documentation.

Generator Pattern

Remember operations are generators - use yield not return for commands.

Common Pitfalls

Calling Operations Within Operations

# WRONG
@operation()
def bad_operation():
    apt.update()  # This will fail!
    yield "echo done"

# CORRECT
@operation()
def good_operation():
    for command in apt.update._inner():
        yield command
    yield "echo done"

Forgetting to Check State

# WRONG (not idempotent)
@operation()
def bad_user(username: str):
    yield f"useradd {username}"  # Fails if user exists!

# CORRECT
@operation()
def good_user(username: str):
    from pyinfra.facts.server import User
    if not host.get_fact(User, user=username):
        yield f"useradd {username}"

Returning Instead of Yielding

# WRONG
@operation()
def bad_operation():
    return "echo hello"  # Returns generator, doesn't yield!

# CORRECT
@operation()
def good_operation():
    yield "echo hello"

Facts

Learn how to collect state information for idempotent operations

Architecture

Understand the two-phase model that powers operations

State

Deep dive into state management and operation tracking

Operations API

Browse all built-in operations

Build docs developers (and LLMs) love