Skip to main content
Connectors are pyinfra’s abstraction for executing commands on remote hosts. While pyinfra includes connectors for SSH, Docker, and local execution, you can create custom connectors for other connection methods like custom protocols, cloud APIs, or container orchestrators.

Understanding Connectors

Connectors in pyinfra:
  • Establish and manage connections to remote hosts
  • Execute shell commands and return output
  • Transfer files to and from remote hosts
  • Can define custom host data and configuration
  • Are loaded via Python entry points

The BaseConnector Class

All connectors inherit from BaseConnector, defined in src/pyinfra/connectors/base.py:71:
from pyinfra.connectors.base import BaseConnector, DataMeta
from typing import Tuple, Iterator
from io import IOBase

class BaseConnector:
    state: State  # Current deployment state
    host: Host    # Current host being managed
    
    handles_execution = False  # Set to True if connector executes commands
    
    data_cls: Type = ConnectorData  # TypedDict for connector data
    data_meta: dict[str, DataMeta] = {}  # Metadata for connector data
    
    def __init__(self, state: State, host: Host):
        self.state = state
        self.host = host
        self.data = host_to_connector_data(
            self.data_cls, self.data_meta, host.data
        )
    
    @staticmethod
    @abstractmethod
    def make_names_data(name: str) -> Iterator[tuple[str, dict, list[str]]]:
        """Generate inventory targets from a connection string."""
        pass
    
    def connect(self) -> None:
        """Establish connection to the remote host."""
        pass
    
    def disconnect(self) -> None:
        """Close connection to the remote host."""
        pass
    
    @abstractmethod
    def run_shell_command(
        self,
        command: StringCommand,
        print_output: bool,
        print_input: bool,
        **arguments: ConnectorArguments,
    ) -> Tuple[bool, CommandOutput]:
        """Execute a command and return success status and output."""
        pass
    
    @abstractmethod
    def put_file(
        self,
        filename_or_io: Union[str, IOBase],
        remote_filename: str,
        remote_temp_filename: Optional[str] = None,
        print_output: bool = False,
        print_input: bool = False,
        **arguments: ConnectorArguments,
    ) -> bool:
        """Upload a file to the remote host."""
        pass
    
    @abstractmethod
    def get_file(
        self,
        remote_filename: str,
        filename_or_io: Union[str, IOBase],
        remote_temp_filename: Optional[str] = None,
        print_output: bool = False,
        print_input: bool = False,
        **arguments: ConnectorArguments,
    ) -> bool:
        """Download a file from the remote host."""
        pass

Creating a Basic Connector

Here’s a simple example based on the local connector pattern from src/pyinfra/connectors/local.py:26:
from pyinfra.connectors.base import BaseConnector, DataMeta
from pyinfra.connectors.util import (
    CommandOutput,
    make_unix_command_for_host,
    run_local_process,
)
from pyinfra.api.exceptions import ConnectError, InventoryError
from typing_extensions import TypedDict, Unpack, override
from typing import Tuple

class CustomConnectorData(TypedDict):
    """Define connector-specific configuration."""
    custom_param: str
    custom_timeout: int

class CustomConnector(BaseConnector):
    """
    Custom connector for executing commands via a custom protocol.
    """
    
    handles_execution = True  # This connector can execute commands
    
    data_cls = CustomConnectorData
    data_meta = {
        "custom_param": DataMeta("Custom parameter description", default="default_value"),
        "custom_timeout": DataMeta("Custom timeout in seconds", default=30),
    }
    
    @override
    @staticmethod
    def make_names_data(name: str):
        """Parse connection string and generate host data."""
        # Parse name (e.g., "@custom:host1,host2")
        if not name.startswith("@custom:"):
            raise InventoryError(
                f"Invalid custom connector name: {name}. "
                "Expected format: @custom:hostname"
            )
        
        hostname = name.replace("@custom:", "")
        host_data = {"custom_param": "value_from_name"}
        groups = ["custom"]
        
        yield hostname, host_data, groups
    
    @override
    def connect(self) -> None:
        """Establish connection to remote host."""
        # Implement connection logic
        try:
            # Example: connect to custom service
            self._connection = self._establish_connection(
                host=self.host.name,
                timeout=self.data["custom_timeout"]
            )
        except Exception as e:
            raise ConnectError(f"Failed to connect: {e}")
    
    @override
    def disconnect(self) -> None:
        """Close connection."""
        if hasattr(self, "_connection") and self._connection:
            self._connection.close()
            self._connection = None
    
    @override
    def run_shell_command(
        self,
        command: StringCommand,
        print_output: bool = False,
        print_input: bool = False,
        **arguments: Unpack[ConnectorArguments],
    ) -> Tuple[bool, CommandOutput]:
        """Execute a command on the remote host."""
        # Build the actual command
        unix_command = make_unix_command_for_host(
            self.state, self.host, command, **arguments
        )
        actual_command = unix_command.get_raw_value()
        
        # Execute via your custom protocol
        exit_code, stdout, stderr = self._execute_remote_command(
            actual_command
        )
        
        # Return success status and output
        success = exit_code == 0
        output = CommandOutput(
            stdout_lines=stdout.split("\n"),
            stderr_lines=stderr.split("\n")
        )
        
        return success, output
    
    @override
    def put_file(
        self,
        filename_or_io,
        remote_filename: str,
        remote_temp_filename=None,
        print_output: bool = False,
        print_input: bool = False,
        **arguments,
    ) -> bool:
        """Upload a file to the remote host."""
        from pyinfra.api.util import get_file_io
        
        # Read file content
        with get_file_io(filename_or_io) as file_io:
            content = file_io.read()
            if isinstance(content, str):
                content = content.encode()
        
        # Upload via your custom protocol
        try:
            self._upload_file(content, remote_filename)
            return True
        except Exception as e:
            logger.error(f"Failed to upload file: {e}")
            return False
    
    @override
    def get_file(
        self,
        remote_filename: str,
        filename_or_io,
        remote_temp_filename=None,
        print_output: bool = False,
        print_input: bool = False,
        **arguments,
    ) -> bool:
        """Download a file from the remote host."""
        try:
            content = self._download_file(remote_filename)
            
            # Write to local file or IO object
            if isinstance(filename_or_io, str):
                with open(filename_or_io, "wb") as f:
                    f.write(content)
            else:
                filename_or_io.write(content)
            
            return True
        except Exception as e:
            logger.error(f"Failed to download file: {e}")
            return False
    
    # Helper methods for your custom protocol
    def _establish_connection(self, host: str, timeout: int):
        """Implement your connection logic."""
        # Example: return custom_protocol.connect(host, timeout)
        pass
    
    def _execute_remote_command(self, command: str) -> Tuple[int, str, str]:
        """Execute command and return (exit_code, stdout, stderr)."""
        # Example: return self._connection.exec(command)
        pass
    
    def _upload_file(self, content: bytes, remote_path: str):
        """Upload file content to remote path."""
        # Example: self._connection.put(content, remote_path)
        pass
    
    def _download_file(self, remote_path: str) -> bytes:
        """Download file from remote path."""
        # Example: return self._connection.get(remote_path)
        pass

Complete Example: REST API Connector

Here’s a more complete example of a connector that manages resources via a REST API:
import requests
from pyinfra import logger
from pyinfra.connectors.base import BaseConnector, DataMeta
from pyinfra.connectors.util import CommandOutput
from pyinfra.api.exceptions import ConnectError
from typing_extensions import TypedDict, Unpack, override

class ApiConnectorData(TypedDict):
    api_url: str
    api_key: str
    api_timeout: int

class ApiConnector(BaseConnector):
    """
    Execute commands on remote hosts via a REST API.
    
    Connection format: @api:resource_id
    
    Example:
        pyinfra @api:server-123 deploy.py
    """
    
    handles_execution = True
    
    data_cls = ApiConnectorData
    data_meta = {
        "api_url": DataMeta("API base URL", default="https://api.example.com"),
        "api_key": DataMeta("API authentication key"),
        "api_timeout": DataMeta("API request timeout", default=60),
    }
    
    @override
    @staticmethod
    def make_names_data(name: str):
        """Parse @api:resource_id format."""
        if not name.startswith("@api:"):
            raise InventoryError(
                f"Invalid API connector name: {name}. "
                "Expected format: @api:resource_id"
            )
        
        resource_id = name.replace("@api:", "")
        
        # Yield (hostname, host_data, groups)
        yield resource_id, {}, ["api"]
    
    @override
    def connect(self) -> None:
        """Verify API connectivity and resource existence."""
        url = f"{self.data['api_url']}/resources/{self.host.name}"
        
        try:
            response = requests.get(
                url,
                headers={"Authorization": f"Bearer {self.data['api_key']}"},
                timeout=self.data["api_timeout"]
            )
            response.raise_for_status()
            
            self.host.host_data["api_resource"] = response.json()
            logger.info(f"Connected to API resource: {self.host.name}")
            
        except requests.RequestException as e:
            raise ConnectError(
                f"Failed to connect to API resource {self.host.name}: {e}"
            )
    
    @override
    def run_shell_command(
        self,
        command: StringCommand,
        print_output: bool = False,
        print_input: bool = False,
        **arguments: Unpack[ConnectorArguments],
    ) -> Tuple[bool, CommandOutput]:
        """Execute command via API."""
        url = f"{self.data['api_url']}/resources/{self.host.name}/exec"
        
        # Convert StringCommand to string
        command_str = command.get_raw_value()
        
        if print_input:
            logger.info(f">>> {command_str}")
        
        try:
            response = requests.post(
                url,
                json={
                    "command": command_str,
                    "timeout": arguments.get("_timeout", 300),
                },
                headers={"Authorization": f"Bearer {self.data['api_key']}"},
                timeout=self.data["api_timeout"]
            )
            response.raise_for_status()
            
            result = response.json()
            exit_code = result.get("exit_code", 0)
            stdout = result.get("stdout", "")
            stderr = result.get("stderr", "")
            
            if print_output:
                if stdout:
                    logger.info(stdout)
                if stderr:
                    logger.error(stderr)
            
            success = exit_code == arguments.get("_success_exit_codes", [0])[0]
            output = CommandOutput(
                stdout_lines=stdout.split("\n") if stdout else [],
                stderr_lines=stderr.split("\n") if stderr else []
            )
            
            return success, output
            
        except requests.RequestException as e:
            logger.error(f"Command execution failed: {e}")
            return False, CommandOutput([], [str(e)])
    
    @override
    def put_file(
        self,
        filename_or_io,
        remote_filename: str,
        remote_temp_filename=None,
        print_output: bool = False,
        print_input: bool = False,
        **arguments,
    ) -> bool:
        """Upload file via API."""
        from pyinfra.api.util import get_file_io
        import base64
        
        url = f"{self.data['api_url']}/resources/{self.host.name}/files"
        
        # Read and encode file content
        with get_file_io(filename_or_io) as file_io:
            content = file_io.read()
            if isinstance(content, str):
                content = content.encode()
            content_b64 = base64.b64encode(content).decode()
        
        try:
            response = requests.post(
                url,
                json={
                    "path": remote_filename,
                    "content": content_b64,
                    "encoding": "base64",
                },
                headers={"Authorization": f"Bearer {self.data['api_key']}"},
                timeout=self.data["api_timeout"]
            )
            response.raise_for_status()
            
            if print_output:
                logger.info(f"Uploaded file to {remote_filename}")
            
            return True
            
        except requests.RequestException as e:
            logger.error(f"File upload failed: {e}")
            return False
    
    @override
    def get_file(
        self,
        remote_filename: str,
        filename_or_io,
        remote_temp_filename=None,
        print_output: bool = False,
        print_input: bool = False,
        **arguments,
    ) -> bool:
        """Download file via API."""
        import base64
        
        url = f"{self.data['api_url']}/resources/{self.host.name}/files/{remote_filename}"
        
        try:
            response = requests.get(
                url,
                headers={"Authorization": f"Bearer {self.data['api_key']}"},
                timeout=self.data["api_timeout"]
            )
            response.raise_for_status()
            
            result = response.json()
            content_b64 = result.get("content", "")
            content = base64.b64decode(content_b64)
            
            # Write to file or IO object
            if isinstance(filename_or_io, str):
                with open(filename_or_io, "wb") as f:
                    f.write(content)
            else:
                filename_or_io.write(content)
            
            if print_output:
                logger.info(f"Downloaded file from {remote_filename}")
            
            return True
            
        except requests.RequestException as e:
            logger.error(f"File download failed: {e}")
            return False

Registering Your Connector

Connectors are loaded via Python entry points. Add this to your setup.py or pyproject.toml:

setup.py

from setuptools import setup

setup(
    name="pyinfra-custom-connector",
    version="0.1.0",
    packages=["pyinfra_custom"],
    entry_points={
        "pyinfra.connectors": [
            "custom = pyinfra_custom.connector:CustomConnector",
            "api = pyinfra_custom.connector:ApiConnector",
        ],
    },
)

pyproject.toml

[project.entry-points."pyinfra.connectors"]
custom = "pyinfra_custom.connector:CustomConnector"
api = "pyinfra_custom.connector:ApiConnector"

Helper Utilities

pyinfra provides utilities for common connector tasks:

Command Building

from pyinfra.connectors.util import make_unix_command_for_host

# Build command with sudo, su_user, env, etc.
unix_command = make_unix_command_for_host(
    state, host, command,
    _sudo=True,
    _sudo_user="root",
    _env={"PATH": "/usr/local/bin:/usr/bin"},
)

Running Local Processes

from pyinfra.connectors.util import run_local_process

exit_code, output = run_local_process(
    "ls -la /tmp",
    stdin=None,
    timeout=30,
    print_output=True,
    print_prefix="[local] ",
)

Sudo Retry Logic

from pyinfra.connectors.util import execute_command_with_sudo_retry

def execute():
    # Your execution logic
    return exit_code, output

exit_code, output = execute_command_with_sudo_retry(
    host,
    arguments,  # ConnectorArguments with _sudo, _sudo_password, etc.
    execute,
)

Connector Data Types

Define typed connector data for better validation:
from typing_extensions import TypedDict
from pyinfra.connectors.base import DataMeta

class SshConnectorData(TypedDict):
    ssh_hostname: str
    ssh_port: int
    ssh_user: str
    ssh_password: str
    ssh_key: str

data_meta = {
    "ssh_hostname": DataMeta("SSH hostname"),
    "ssh_port": DataMeta("SSH port", default=22),
    "ssh_user": DataMeta("SSH user", default="root"),
    "ssh_password": DataMeta("SSH password"),
    "ssh_key": DataMeta("SSH key filename"),
}

Testing Your Connector

Create a test script:
# test_custom_connector.py
from pyinfra import host
from pyinfra.operations import server

# Test basic command execution
result = server.shell(
    name="Test custom connector",
    commands=["echo 'Hello from custom connector'"],
)

print(f"Command succeeded: {result.did_succeed()}")
print(f"Output: {result.stdout}")

# Test file operations
from pyinfra.operations import files

files.put(
    name="Upload test file",
    src="test.txt",
    dest="/tmp/test.txt",
)
Run with your connector:
pyinfra @custom:myhost test_custom_connector.py

Best Practices

  1. Error Handling: Always raise ConnectError for connection failures
  2. Logging: Use pyinfra’s logger for consistent output
  3. Type Safety: Use TypedDict for connector data and type hints throughout
  4. Resource Cleanup: Implement disconnect() to clean up connections
  5. Timeout Handling: Respect timeout arguments for all operations
  6. Sudo Support: Use make_unix_command_for_host to handle sudo/su properly
  7. Test Thoroughly: Test with various commands, file operations, and error conditions
  8. Documentation: Document connector usage and configuration options
  9. Connection Pooling: Consider implementing connection reuse for performance
  10. Error Messages: Provide clear, actionable error messages

Next Steps

Build docs developers (and LLMs) love