Skip to main content

Overview

Connectors are pyinfra’s abstraction layer for interfacing with different types of target systems. They handle connection management, command execution, and file transfers across various protocols and platforms.
Every host in pyinfra uses a connector to execute commands. The default is SSH, but pyinfra supports local, Docker, and custom connectors.

BaseConnector Class

All connectors inherit from BaseConnector (defined in src/pyinfra/connectors/base.py):
# From src/pyinfra/connectors/base.py:71-174
class BaseConnector(abc.ABC):
    state: State
    host: Host

    handles_execution = False  # Does this connector execute commands?

    data_cls: Type = ConnectorData
    data_meta: dict[str, DataMeta] = {}

    def __init__(self, state: State, host: Host):
        self.state = state
        self.host = host
        self.data = host_to_connector_data(self.data_cls, self.data_meta, host.data)

    @staticmethod
    @abc.abstractmethod
    def make_names_data(name: str) -> Iterator[tuple[str, dict, list[str]]]:
        """
        Generate inventory targets. Yields (name, data, groups) tuples.
        """

    def connect(self) -> None:
        """
        Connect this connector instance.
        """

    def disconnect(self) -> None:
        """
        Disconnect this connector instance.
        """

    @abc.abstractmethod
    def run_shell_command(
        self,
        command: StringCommand,
        print_output: bool,
        print_input: bool,
        **arguments: Unpack[ConnectorArguments],
    ) -> tuple[bool, CommandOutput]:
        """
        Execute a command.
        Returns: (success_bool, CommandOutput)
        """

    @abc.abstractmethod
    def put_file(
        self,
        filename_or_io: Union[str, IOBase],
        remote_filename: str,
        remote_temp_filename: Optional[str] = None,
        print_output: bool = False,
        print_input: bool = False,
        **arguments: Unpack[ConnectorArguments],
    ) -> bool:
        """
        Upload a file or IO object.
        Returns: success_bool
        """

    @abc.abstractmethod
    def get_file(
        self,
        remote_filename: str,
        filename_or_io: Union[str, IOBase],
        remote_temp_filename: Optional[str] = None,
        print_output: bool = False,
        print_input: bool = False,
        **arguments: Unpack[ConnectorArguments],
    ) -> bool:
        """
        Download a file.
        Returns: success_bool
        """

Key Methods

  1. make_names_data: Static method that generates host entries from connector strings
  2. connect: Establish connection to target
  3. disconnect: Close connection
  4. run_shell_command: Execute shell commands
  5. put_file: Upload files
  6. get_file: Download files

Built-in Connectors

pyinfra includes several built-in connectors:

SSH Connector (Default)

The SSH connector uses Paramiko for SSH connections:
# From src/pyinfra/connectors/ssh.py:37-100
class ConnectorData(TypedDict):
    ssh_hostname: str
    ssh_port: int
    ssh_user: str
    ssh_password: str
    ssh_key: str
    ssh_key_password: str
    ssh_allow_agent: bool
    ssh_look_for_keys: bool
    ssh_forward_agent: bool
    ssh_config_file: str
    ssh_known_hosts_file: str
    ssh_strict_host_key_checking: str
    ssh_paramiko_connect_kwargs: dict
    ssh_connect_retries: int
    ssh_connect_retry_min_delay: float
    ssh_connect_retry_max_delay: float
    ssh_file_transfer_protocol: str  # "sftp" or "scp"
Usage:
from pyinfra.api import Inventory

# Explicit SSH (default)
inventory = Inventory(
    (
        [
            "@ssh/web1.example.com",
            "web2.example.com",  # Implicit SSH
        ],
        {"ssh_user": "deploy", "ssh_port": 22},
    ),
)
SSH connector features:
  • Password and key-based authentication
  • SSH agent support
  • SSH config file support
  • Connection retry logic
  • SFTP and SCP file transfer
  • Port forwarding

Local Connector

Execute commands on the local machine:
# From src/pyinfra/connectors/local.py
class LocalConnector(BaseConnector):
    handles_execution = True

    @staticmethod
    def make_names_data(name: str):
        yield "@local", {}, []

    def run_shell_command(self, command, print_output, print_input, **arguments):
        # Execute locally using subprocess
        ...
Usage:
pyinfra @local deploy.py
from pyinfra.api import Inventory

inventory = Inventory(
    (["@local"], {}),
)
Use cases:
  • Testing deployments
  • Configuring the control machine
  • Local development
  • CI/CD pipelines

Docker Connector

Execute commands inside Docker containers:
# From src/pyinfra/connectors/docker.py
class ConnectorData(TypedDict):
    docker_container_id: str
    docker_user: str

class DockerConnector(BaseConnector):
    handles_execution = True

    @staticmethod
    def make_names_data(name: str):
        # name is container ID or name
        yield f"@docker/{name}", {"docker_container_id": name}, []
Usage:
# Target running container
pyinfra @docker/my_container deploy.py

# Target multiple containers
pyinfra @docker/web1,@docker/web2 deploy.py
from pyinfra.api import Inventory

inventory = Inventory(
    (
        [
            "@docker/web_container",
            "@docker/db_container",
        ],
        {},
    ),
)
Features:
  • Execute commands via docker exec
  • File transfer via docker cp
  • Run as different users
  • Target by container name or ID

Docker-SSH Connector

SSH into Docker containers:
# From src/pyinfra/connectors/dockerssh.py
class DockerSSHConnector(SSHConnector):
    """
    SSH connector that targets Docker containers via their SSH daemon.
    """
Usage:
inventory = Inventory(
    (
        ["@dockerssh/container_name"],
        {"ssh_user": "root", "ssh_port": 2222},
    ),
)

Chroot Connector

Execute commands in a chroot environment:
# From src/pyinfra/connectors/chroot.py
class ChrootConnector(BaseConnector):
    handles_execution = True

    @staticmethod
    def make_names_data(name: str):
        yield f"@chroot/{name}", {"chroot_path": name}, []
Usage:
pyinfra @chroot//mnt/rootfs deploy.py
Use cases:
  • System installation
  • Rescue operations
  • Container image building

Terraform Connector

Generate inventory from Terraform state:
# From src/pyinfra/connectors/terraform.py
class TerraformConnector(BaseConnector):
    @staticmethod
    def make_names_data(name: str):
        # Reads terraform.tfstate
        # Yields hosts from Terraform resources
        ...
Usage:
pyinfra @terraform deploy.py

Vagrant Connector

Target Vagrant VMs:
# From src/pyinfra/connectors/vagrant.py
class VagrantConnector(SSHConnector):
    @staticmethod
    def make_names_data(name: str):
        # Uses `vagrant ssh-config`
        ...
Usage:
pyinfra @vagrant deploy.py

Connector Data

Connectors can define typed data classes:
# From src/pyinfra/connectors/base.py:57-68
class DataMeta:
    description: str
    default: Any

    def __init__(self, description, default=None) -> None:
        self.description = description
        self.default = default

class ConnectorData(TypedDict, total=False):
    pass
Example - SSH connector data:
# From src/pyinfra/connectors/ssh.py:61-100
connector_data_meta: dict[str, DataMeta] = {
    "ssh_hostname": DataMeta("SSH hostname"),
    "ssh_port": DataMeta("SSH port"),
    "ssh_user": DataMeta("SSH user"),
    "ssh_password": DataMeta("SSH password"),
    "ssh_key": DataMeta("SSH key filename"),
    "ssh_allow_agent": DataMeta(
        "Whether to use any active SSH agent",
        True,
    ),
    "ssh_look_for_keys": DataMeta(
        "Whether to look for private keys",
        True,
    ),
}

Command Execution

Connectors execute commands through run_shell_command:
# Signature
def run_shell_command(
    self,
    command: StringCommand,
    print_output: bool,
    print_input: bool,
    **arguments: ConnectorArguments,
) -> tuple[bool, CommandOutput]:
    pass

ConnectorArguments

From src/pyinfra/api/arguments.py:
class ConnectorArguments(TypedDict, total=False):
    _sudo: bool
    _sudo_user: str
    _su_user: str
    _preserve_sudo_env: bool
    _use_sudo_password: bool
    _use_su_login: bool
    _env: dict[str, str]
    _timeout: int
    _shell_executable: str

Example Execution

from pyinfra.api import StringCommand

command = StringCommand("apt-get", "update")

success, output = host.connector.run_shell_command(
    command,
    print_output=True,
    print_input=True,
    _sudo=True,
    _timeout=30,
)

if success:
    print(f"stdout: {output.stdout_lines}")
else:
    print(f"stderr: {output.stderr_lines}")

File Transfer

Upload Files

# Put file
success = host.connector.put_file(
    filename_or_io="/local/file.txt",
    remote_filename="/remote/file.txt",
    print_output=True,
    _sudo=True,
)

# Upload IO object
from io import StringIO

file_io = StringIO("Hello, World!")
success = host.connector.put_file(
    filename_or_io=file_io,
    remote_filename="/tmp/hello.txt",
)

Download Files

# Get file
success = host.connector.get_file(
    remote_filename="/remote/config.json",
    filename_or_io="/local/config.json",
    print_output=True,
)

# Download to IO object
from io import BytesIO

file_io = BytesIO()
success = host.connector.get_file(
    remote_filename="/etc/hosts",
    filename_or_io=file_io,
)

if success:
    content = file_io.getvalue()

Connection Management

Connecting

from pyinfra.api import Inventory, State, Config

inventory = Inventory((["web1.example.com"], {}))
state = State()
state.init(inventory, config)

for host in inventory:
    # Connect explicitly
    try:
        host.connect(reason="for deployment", raise_exceptions=True)
        print(f"Connected to {host.name}")
    except ConnectError as e:
        print(f"Failed to connect: {e}")

Disconnecting

for host in inventory:
    if host.connected:
        host.disconnect()
        print(f"Disconnected from {host.name}")

Connection State

if host.connected:
    print("Host is connected")
else:
    print("Host is not connected")

Creating Custom Connectors

Basic Custom Connector

from pyinfra.connectors.base import BaseConnector
from pyinfra.api import StringCommand
from pyinfra.connectors.util import CommandOutput
from typing import Iterator, Tuple

class MyCustomConnector(BaseConnector):
    handles_execution = True

    @staticmethod
    def make_names_data(name: str) -> Iterator[Tuple[str, dict, list[str]]]:
        """
        Generate host entries from connector string.
        """
        # Parse name and yield (host_name, host_data, groups)
        yield f"@mycustom/{name}", {"custom_param": name}, []

    def connect(self) -> None:
        """
        Establish connection.
        """
        print(f"Connecting to {self.host.name}...")
        # Connection logic here
        self.host.connected = True

    def disconnect(self) -> None:
        """
        Close connection.
        """
        print(f"Disconnecting from {self.host.name}...")
        # Cleanup logic here
        self.host.connected = False

    def run_shell_command(
        self,
        command: StringCommand,
        print_output: bool,
        print_input: bool,
        **arguments,
    ) -> Tuple[bool, CommandOutput]:
        """
        Execute command.
        """
        if print_input:
            print(f"$ {command}")

        # Execute command using your protocol
        stdout_lines = []
        stderr_lines = []
        status = True

        # ... execution logic ...

        output = CommandOutput(
            stdout_lines=[line for line in stdout_lines],
            stderr_lines=[line for line in stderr_lines],
        )

        return status, output

    def put_file(
        self,
        filename_or_io,
        remote_filename,
        remote_temp_filename=None,
        print_output=False,
        print_input=False,
        **arguments,
    ) -> bool:
        """
        Upload file.
        """
        # Upload logic here
        return True

    def get_file(
        self,
        remote_filename,
        filename_or_io,
        remote_temp_filename=None,
        print_output=False,
        print_input=False,
        **arguments,
    ) -> bool:
        """
        Download file.
        """
        # Download logic here
        return True

Registering Custom Connector

from pyinfra.api.connectors import register_connector

register_connector("mycustom", MyCustomConnector)

# Now you can use it
inventory = Inventory(
    (["@mycustom/target"], {}),
)

Connector Utilities

CommandOutput

# From src/pyinfra/connectors/util.py
from pyinfra.connectors.util import CommandOutput, OutputLine

output = CommandOutput([
    OutputLine("stdout", "Line 1"),
    OutputLine("stderr", "Error message"),
    OutputLine("stdout", "Line 2"),
])

# Access lines
stdout_lines = output.stdout_lines  # ["Line 1", "Line 2"]
stderr_lines = output.stderr_lines  # ["Error message"]

Command Helpers

from pyinfra.connectors.util import (
    make_unix_command_for_host,
    execute_command_with_sudo_retry,
)

# Build command with sudo/su
full_command = make_unix_command_for_host(
    state,
    host,
    command,
    _sudo=True,
    _sudo_user="root",
    _env={"PATH": "/usr/local/bin"},
)

Best Practices

Use SSH by Default

SSH is secure, well-tested, and works for most use cases.

Local for Testing

Use @local for testing deployments before running on real hosts.

Docker for Containers

Use @docker connector for containerized applications.

Handle Errors

Always handle connection errors and timeouts gracefully.

Reuse Connections

Connectors maintain connections across operations for efficiency.

Custom When Needed

Create custom connectors for proprietary or specialized systems.

Connection Troubleshooting

SSH Connection Issues

# Enable SSH debugging
inventory = Inventory(
    (
        ["problem-host.example.com"],
        {
            "ssh_user": "deploy",
            "ssh_port": 22,
            "ssh_connect_retries": 3,
            "ssh_connect_retry_min_delay": 1.0,
            "ssh_connect_retry_max_delay": 5.0,
        },
    ),
)

Timeout Configuration

# Increase timeouts for slow connections
from pyinfra.operations import server

server.shell(
    name="Long running command",
    commands=["./slow-script.sh"],
    _timeout=300,  # 5 minutes
)

Connection Retry

for host in inventory:
    retries = 3
    for attempt in range(retries):
        try:
            host.connect(raise_exceptions=True)
            break
        except ConnectError:
            if attempt < retries - 1:
                time.sleep(5)
                continue
            raise

Advanced Examples

Multi-hop SSH

# Connect through bastion host
inventory = Inventory(
    (
        [("internal-host", {
            "ssh_hostname": "internal.local",
            "ssh_user": "deploy",
            "ssh_paramiko_connect_kwargs": {
                "sock": paramiko.ProxyCommand(
                    "ssh bastion.example.com -W %h:%p"
                ),
            },
        })],
        {},
    ),
)

Inventory Generation

from pyinfra.connectors.base import BaseConnector
import requests

class CloudConnector(BaseConnector):
    @staticmethod
    def make_names_data(name: str):
        # Fetch instances from cloud API
        response = requests.get(f"https://api.cloud.com/instances?tag={name}")
        instances = response.json()
        
        for instance in instances:
            yield (
                instance["public_ip"],
                {
                    "instance_id": instance["id"],
                    "instance_type": instance["type"],
                },
                [instance["environment"]],  # Groups
            )

Inventory

Learn how connectors integrate with inventory

Operations

See how operations use connectors to execute commands

State

Understand how State manages connector instances

Host

Learn about the Host-Connector relationship

Build docs developers (and LLMs) love