Understanding Connectors
Connectors in pyinfra:- Establish and manage connections to remote hosts
- Execute shell commands and return output
- Transfer files to and from remote hosts
- Can define custom host data and configuration
- Are loaded via Python entry points
The BaseConnector Class
All connectors inherit fromBaseConnector, defined in src/pyinfra/connectors/base.py:71:
from pyinfra.connectors.base import BaseConnector, DataMeta
from typing import Tuple, Iterator
from io import IOBase
class BaseConnector:
state: State # Current deployment state
host: Host # Current host being managed
handles_execution = False # Set to True if connector executes commands
data_cls: Type = ConnectorData # TypedDict for connector data
data_meta: dict[str, DataMeta] = {} # Metadata for connector data
def __init__(self, state: State, host: Host):
self.state = state
self.host = host
self.data = host_to_connector_data(
self.data_cls, self.data_meta, host.data
)
@staticmethod
@abstractmethod
def make_names_data(name: str) -> Iterator[tuple[str, dict, list[str]]]:
"""Generate inventory targets from a connection string."""
pass
def connect(self) -> None:
"""Establish connection to the remote host."""
pass
def disconnect(self) -> None:
"""Close connection to the remote host."""
pass
@abstractmethod
def run_shell_command(
self,
command: StringCommand,
print_output: bool,
print_input: bool,
**arguments: ConnectorArguments,
) -> Tuple[bool, CommandOutput]:
"""Execute a command and return success status and output."""
pass
@abstractmethod
def put_file(
self,
filename_or_io: Union[str, IOBase],
remote_filename: str,
remote_temp_filename: Optional[str] = None,
print_output: bool = False,
print_input: bool = False,
**arguments: ConnectorArguments,
) -> bool:
"""Upload a file to the remote host."""
pass
@abstractmethod
def get_file(
self,
remote_filename: str,
filename_or_io: Union[str, IOBase],
remote_temp_filename: Optional[str] = None,
print_output: bool = False,
print_input: bool = False,
**arguments: ConnectorArguments,
) -> bool:
"""Download a file from the remote host."""
pass
Creating a Basic Connector
Here’s a simple example based on the local connector pattern fromsrc/pyinfra/connectors/local.py:26:
from pyinfra.connectors.base import BaseConnector, DataMeta
from pyinfra.connectors.util import (
CommandOutput,
make_unix_command_for_host,
run_local_process,
)
from pyinfra.api.exceptions import ConnectError, InventoryError
from typing_extensions import TypedDict, Unpack, override
from typing import Tuple
class CustomConnectorData(TypedDict):
"""Define connector-specific configuration."""
custom_param: str
custom_timeout: int
class CustomConnector(BaseConnector):
"""
Custom connector for executing commands via a custom protocol.
"""
handles_execution = True # This connector can execute commands
data_cls = CustomConnectorData
data_meta = {
"custom_param": DataMeta("Custom parameter description", default="default_value"),
"custom_timeout": DataMeta("Custom timeout in seconds", default=30),
}
@override
@staticmethod
def make_names_data(name: str):
"""Parse connection string and generate host data."""
# Parse name (e.g., "@custom:host1,host2")
if not name.startswith("@custom:"):
raise InventoryError(
f"Invalid custom connector name: {name}. "
"Expected format: @custom:hostname"
)
hostname = name.replace("@custom:", "")
host_data = {"custom_param": "value_from_name"}
groups = ["custom"]
yield hostname, host_data, groups
@override
def connect(self) -> None:
"""Establish connection to remote host."""
# Implement connection logic
try:
# Example: connect to custom service
self._connection = self._establish_connection(
host=self.host.name,
timeout=self.data["custom_timeout"]
)
except Exception as e:
raise ConnectError(f"Failed to connect: {e}")
@override
def disconnect(self) -> None:
"""Close connection."""
if hasattr(self, "_connection") and self._connection:
self._connection.close()
self._connection = None
@override
def run_shell_command(
self,
command: StringCommand,
print_output: bool = False,
print_input: bool = False,
**arguments: Unpack[ConnectorArguments],
) -> Tuple[bool, CommandOutput]:
"""Execute a command on the remote host."""
# Build the actual command
unix_command = make_unix_command_for_host(
self.state, self.host, command, **arguments
)
actual_command = unix_command.get_raw_value()
# Execute via your custom protocol
exit_code, stdout, stderr = self._execute_remote_command(
actual_command
)
# Return success status and output
success = exit_code == 0
output = CommandOutput(
stdout_lines=stdout.split("\n"),
stderr_lines=stderr.split("\n")
)
return success, output
@override
def put_file(
self,
filename_or_io,
remote_filename: str,
remote_temp_filename=None,
print_output: bool = False,
print_input: bool = False,
**arguments,
) -> bool:
"""Upload a file to the remote host."""
from pyinfra.api.util import get_file_io
# Read file content
with get_file_io(filename_or_io) as file_io:
content = file_io.read()
if isinstance(content, str):
content = content.encode()
# Upload via your custom protocol
try:
self._upload_file(content, remote_filename)
return True
except Exception as e:
logger.error(f"Failed to upload file: {e}")
return False
@override
def get_file(
self,
remote_filename: str,
filename_or_io,
remote_temp_filename=None,
print_output: bool = False,
print_input: bool = False,
**arguments,
) -> bool:
"""Download a file from the remote host."""
try:
content = self._download_file(remote_filename)
# Write to local file or IO object
if isinstance(filename_or_io, str):
with open(filename_or_io, "wb") as f:
f.write(content)
else:
filename_or_io.write(content)
return True
except Exception as e:
logger.error(f"Failed to download file: {e}")
return False
# Helper methods for your custom protocol
def _establish_connection(self, host: str, timeout: int):
"""Implement your connection logic."""
# Example: return custom_protocol.connect(host, timeout)
pass
def _execute_remote_command(self, command: str) -> Tuple[int, str, str]:
"""Execute command and return (exit_code, stdout, stderr)."""
# Example: return self._connection.exec(command)
pass
def _upload_file(self, content: bytes, remote_path: str):
"""Upload file content to remote path."""
# Example: self._connection.put(content, remote_path)
pass
def _download_file(self, remote_path: str) -> bytes:
"""Download file from remote path."""
# Example: return self._connection.get(remote_path)
pass
Complete Example: REST API Connector
Here’s a more complete example of a connector that manages resources via a REST API:import requests
from pyinfra import logger
from pyinfra.connectors.base import BaseConnector, DataMeta
from pyinfra.connectors.util import CommandOutput
from pyinfra.api.exceptions import ConnectError
from typing_extensions import TypedDict, Unpack, override
class ApiConnectorData(TypedDict):
api_url: str
api_key: str
api_timeout: int
class ApiConnector(BaseConnector):
"""
Execute commands on remote hosts via a REST API.
Connection format: @api:resource_id
Example:
pyinfra @api:server-123 deploy.py
"""
handles_execution = True
data_cls = ApiConnectorData
data_meta = {
"api_url": DataMeta("API base URL", default="https://api.example.com"),
"api_key": DataMeta("API authentication key"),
"api_timeout": DataMeta("API request timeout", default=60),
}
@override
@staticmethod
def make_names_data(name: str):
"""Parse @api:resource_id format."""
if not name.startswith("@api:"):
raise InventoryError(
f"Invalid API connector name: {name}. "
"Expected format: @api:resource_id"
)
resource_id = name.replace("@api:", "")
# Yield (hostname, host_data, groups)
yield resource_id, {}, ["api"]
@override
def connect(self) -> None:
"""Verify API connectivity and resource existence."""
url = f"{self.data['api_url']}/resources/{self.host.name}"
try:
response = requests.get(
url,
headers={"Authorization": f"Bearer {self.data['api_key']}"},
timeout=self.data["api_timeout"]
)
response.raise_for_status()
self.host.host_data["api_resource"] = response.json()
logger.info(f"Connected to API resource: {self.host.name}")
except requests.RequestException as e:
raise ConnectError(
f"Failed to connect to API resource {self.host.name}: {e}"
)
@override
def run_shell_command(
self,
command: StringCommand,
print_output: bool = False,
print_input: bool = False,
**arguments: Unpack[ConnectorArguments],
) -> Tuple[bool, CommandOutput]:
"""Execute command via API."""
url = f"{self.data['api_url']}/resources/{self.host.name}/exec"
# Convert StringCommand to string
command_str = command.get_raw_value()
if print_input:
logger.info(f">>> {command_str}")
try:
response = requests.post(
url,
json={
"command": command_str,
"timeout": arguments.get("_timeout", 300),
},
headers={"Authorization": f"Bearer {self.data['api_key']}"},
timeout=self.data["api_timeout"]
)
response.raise_for_status()
result = response.json()
exit_code = result.get("exit_code", 0)
stdout = result.get("stdout", "")
stderr = result.get("stderr", "")
if print_output:
if stdout:
logger.info(stdout)
if stderr:
logger.error(stderr)
success = exit_code == arguments.get("_success_exit_codes", [0])[0]
output = CommandOutput(
stdout_lines=stdout.split("\n") if stdout else [],
stderr_lines=stderr.split("\n") if stderr else []
)
return success, output
except requests.RequestException as e:
logger.error(f"Command execution failed: {e}")
return False, CommandOutput([], [str(e)])
@override
def put_file(
self,
filename_or_io,
remote_filename: str,
remote_temp_filename=None,
print_output: bool = False,
print_input: bool = False,
**arguments,
) -> bool:
"""Upload file via API."""
from pyinfra.api.util import get_file_io
import base64
url = f"{self.data['api_url']}/resources/{self.host.name}/files"
# Read and encode file content
with get_file_io(filename_or_io) as file_io:
content = file_io.read()
if isinstance(content, str):
content = content.encode()
content_b64 = base64.b64encode(content).decode()
try:
response = requests.post(
url,
json={
"path": remote_filename,
"content": content_b64,
"encoding": "base64",
},
headers={"Authorization": f"Bearer {self.data['api_key']}"},
timeout=self.data["api_timeout"]
)
response.raise_for_status()
if print_output:
logger.info(f"Uploaded file to {remote_filename}")
return True
except requests.RequestException as e:
logger.error(f"File upload failed: {e}")
return False
@override
def get_file(
self,
remote_filename: str,
filename_or_io,
remote_temp_filename=None,
print_output: bool = False,
print_input: bool = False,
**arguments,
) -> bool:
"""Download file via API."""
import base64
url = f"{self.data['api_url']}/resources/{self.host.name}/files/{remote_filename}"
try:
response = requests.get(
url,
headers={"Authorization": f"Bearer {self.data['api_key']}"},
timeout=self.data["api_timeout"]
)
response.raise_for_status()
result = response.json()
content_b64 = result.get("content", "")
content = base64.b64decode(content_b64)
# Write to file or IO object
if isinstance(filename_or_io, str):
with open(filename_or_io, "wb") as f:
f.write(content)
else:
filename_or_io.write(content)
if print_output:
logger.info(f"Downloaded file from {remote_filename}")
return True
except requests.RequestException as e:
logger.error(f"File download failed: {e}")
return False
Registering Your Connector
Connectors are loaded via Python entry points. Add this to yoursetup.py or pyproject.toml:
setup.py
from setuptools import setup
setup(
name="pyinfra-custom-connector",
version="0.1.0",
packages=["pyinfra_custom"],
entry_points={
"pyinfra.connectors": [
"custom = pyinfra_custom.connector:CustomConnector",
"api = pyinfra_custom.connector:ApiConnector",
],
},
)
pyproject.toml
[project.entry-points."pyinfra.connectors"]
custom = "pyinfra_custom.connector:CustomConnector"
api = "pyinfra_custom.connector:ApiConnector"
Helper Utilities
pyinfra provides utilities for common connector tasks:Command Building
from pyinfra.connectors.util import make_unix_command_for_host
# Build command with sudo, su_user, env, etc.
unix_command = make_unix_command_for_host(
state, host, command,
_sudo=True,
_sudo_user="root",
_env={"PATH": "/usr/local/bin:/usr/bin"},
)
Running Local Processes
from pyinfra.connectors.util import run_local_process
exit_code, output = run_local_process(
"ls -la /tmp",
stdin=None,
timeout=30,
print_output=True,
print_prefix="[local] ",
)
Sudo Retry Logic
from pyinfra.connectors.util import execute_command_with_sudo_retry
def execute():
# Your execution logic
return exit_code, output
exit_code, output = execute_command_with_sudo_retry(
host,
arguments, # ConnectorArguments with _sudo, _sudo_password, etc.
execute,
)
Connector Data Types
Define typed connector data for better validation:from typing_extensions import TypedDict
from pyinfra.connectors.base import DataMeta
class SshConnectorData(TypedDict):
ssh_hostname: str
ssh_port: int
ssh_user: str
ssh_password: str
ssh_key: str
data_meta = {
"ssh_hostname": DataMeta("SSH hostname"),
"ssh_port": DataMeta("SSH port", default=22),
"ssh_user": DataMeta("SSH user", default="root"),
"ssh_password": DataMeta("SSH password"),
"ssh_key": DataMeta("SSH key filename"),
}
Testing Your Connector
Create a test script:# test_custom_connector.py
from pyinfra import host
from pyinfra.operations import server
# Test basic command execution
result = server.shell(
name="Test custom connector",
commands=["echo 'Hello from custom connector'"],
)
print(f"Command succeeded: {result.did_succeed()}")
print(f"Output: {result.stdout}")
# Test file operations
from pyinfra.operations import files
files.put(
name="Upload test file",
src="test.txt",
dest="/tmp/test.txt",
)
pyinfra @custom:myhost test_custom_connector.py
Best Practices
- Error Handling: Always raise
ConnectErrorfor connection failures - Logging: Use pyinfra’s logger for consistent output
- Type Safety: Use TypedDict for connector data and type hints throughout
- Resource Cleanup: Implement
disconnect()to clean up connections - Timeout Handling: Respect timeout arguments for all operations
- Sudo Support: Use
make_unix_command_for_hostto handle sudo/su properly - Test Thoroughly: Test with various commands, file operations, and error conditions
- Documentation: Document connector usage and configuration options
- Connection Pooling: Consider implementing connection reuse for performance
- Error Messages: Provide clear, actionable error messages
Next Steps
- Learn about Writing Operations that use your connector
- Explore Writing Facts to query host state
- See Performance Tuning for optimization strategies
- Check API Reference for complete API documentation
