Overview
Connectors are pyinfra’s abstraction layer for interfacing with different types of target systems. They handle connection management, command execution, and file transfers across various protocols and platforms.
Every host in pyinfra uses a connector to execute commands. The default is SSH, but pyinfra supports local, Docker, and custom connectors.
BaseConnector Class
All connectors inherit from BaseConnector (defined in src/pyinfra/connectors/base.py):
# From src/pyinfra/connectors/base.py:71-174
class BaseConnector ( abc . ABC ):
state: State
host: Host
handles_execution = False # Does this connector execute commands?
data_cls: Type = ConnectorData
data_meta: dict[ str , DataMeta] = {}
def __init__ ( self , state : State, host : Host):
self .state = state
self .host = host
self .data = host_to_connector_data( self .data_cls, self .data_meta, host.data)
@ staticmethod
@abc.abstractmethod
def make_names_data ( name : str ) -> Iterator[tuple[ str , dict , list[ str ]]]:
"""
Generate inventory targets. Yields (name, data, groups) tuples.
"""
def connect ( self ) -> None :
"""
Connect this connector instance.
"""
def disconnect ( self ) -> None :
"""
Disconnect this connector instance.
"""
@abc.abstractmethod
def run_shell_command (
self ,
command : StringCommand,
print_output : bool ,
print_input : bool ,
** arguments : Unpack[ConnectorArguments],
) -> tuple[ bool , CommandOutput]:
"""
Execute a command.
Returns: (success_bool, CommandOutput)
"""
@abc.abstractmethod
def put_file (
self ,
filename_or_io : Union[ str , IOBase],
remote_filename : str ,
remote_temp_filename : Optional[ str ] = None ,
print_output : bool = False ,
print_input : bool = False ,
** arguments : Unpack[ConnectorArguments],
) -> bool :
"""
Upload a file or IO object.
Returns: success_bool
"""
@abc.abstractmethod
def get_file (
self ,
remote_filename : str ,
filename_or_io : Union[ str , IOBase],
remote_temp_filename : Optional[ str ] = None ,
print_output : bool = False ,
print_input : bool = False ,
** arguments : Unpack[ConnectorArguments],
) -> bool :
"""
Download a file.
Returns: success_bool
"""
Key Methods
make_names_data : Static method that generates host entries from connector strings
connect : Establish connection to target
disconnect : Close connection
run_shell_command : Execute shell commands
put_file : Upload files
get_file : Download files
Built-in Connectors
pyinfra includes several built-in connectors:
SSH Connector (Default)
The SSH connector uses Paramiko for SSH connections:
# From src/pyinfra/connectors/ssh.py:37-100
class ConnectorData ( TypedDict ):
ssh_hostname: str
ssh_port: int
ssh_user: str
ssh_password: str
ssh_key: str
ssh_key_password: str
ssh_allow_agent: bool
ssh_look_for_keys: bool
ssh_forward_agent: bool
ssh_config_file: str
ssh_known_hosts_file: str
ssh_strict_host_key_checking: str
ssh_paramiko_connect_kwargs: dict
ssh_connect_retries: int
ssh_connect_retry_min_delay: float
ssh_connect_retry_max_delay: float
ssh_file_transfer_protocol: str # "sftp" or "scp"
Usage:
from pyinfra.api import Inventory
# Explicit SSH (default)
inventory = Inventory(
(
[
"@ssh/web1.example.com" ,
"web2.example.com" , # Implicit SSH
],
{ "ssh_user" : "deploy" , "ssh_port" : 22 },
),
)
SSH connector features:
Password and key-based authentication
SSH agent support
SSH config file support
Connection retry logic
SFTP and SCP file transfer
Port forwarding
Local Connector
Execute commands on the local machine:
# From src/pyinfra/connectors/local.py
class LocalConnector ( BaseConnector ):
handles_execution = True
@ staticmethod
def make_names_data ( name : str ):
yield "@local" , {}, []
def run_shell_command ( self , command , print_output , print_input , ** arguments ):
# Execute locally using subprocess
...
Usage:
from pyinfra.api import Inventory
inventory = Inventory(
([ "@local" ], {}),
)
Use cases:
Testing deployments
Configuring the control machine
Local development
CI/CD pipelines
Docker Connector
Execute commands inside Docker containers:
# From src/pyinfra/connectors/docker.py
class ConnectorData ( TypedDict ):
docker_container_id: str
docker_user: str
class DockerConnector ( BaseConnector ):
handles_execution = True
@ staticmethod
def make_names_data ( name : str ):
# name is container ID or name
yield f "@docker/ { name } " , { "docker_container_id" : name}, []
Usage:
# Target running container
pyinfra @docker/my_container deploy.py
# Target multiple containers
pyinfra @docker/web1,@docker/web2 deploy.py
from pyinfra.api import Inventory
inventory = Inventory(
(
[
"@docker/web_container" ,
"@docker/db_container" ,
],
{},
),
)
Features:
Execute commands via docker exec
File transfer via docker cp
Run as different users
Target by container name or ID
Docker-SSH Connector
SSH into Docker containers:
# From src/pyinfra/connectors/dockerssh.py
class DockerSSHConnector ( SSHConnector ):
"""
SSH connector that targets Docker containers via their SSH daemon.
"""
Usage:
inventory = Inventory(
(
[ "@dockerssh/container_name" ],
{ "ssh_user" : "root" , "ssh_port" : 2222 },
),
)
Chroot Connector
Execute commands in a chroot environment:
# From src/pyinfra/connectors/chroot.py
class ChrootConnector ( BaseConnector ):
handles_execution = True
@ staticmethod
def make_names_data ( name : str ):
yield f "@chroot/ { name } " , { "chroot_path" : name}, []
Usage:
pyinfra @chroot//mnt/rootfs deploy.py
Use cases:
System installation
Rescue operations
Container image building
Generate inventory from Terraform state:
# From src/pyinfra/connectors/terraform.py
class TerraformConnector ( BaseConnector ):
@ staticmethod
def make_names_data ( name : str ):
# Reads terraform.tfstate
# Yields hosts from Terraform resources
...
Usage:
pyinfra @terraform deploy.py
Vagrant Connector
Target Vagrant VMs:
# From src/pyinfra/connectors/vagrant.py
class VagrantConnector ( SSHConnector ):
@ staticmethod
def make_names_data ( name : str ):
# Uses `vagrant ssh-config`
...
Usage:
pyinfra @vagrant deploy.py
Connector Data
Connectors can define typed data classes:
# From src/pyinfra/connectors/base.py:57-68
class DataMeta :
description: str
default: Any
def __init__ ( self , description , default = None ) -> None :
self .description = description
self .default = default
class ConnectorData ( TypedDict , total = False ):
pass
Example - SSH connector data:
# From src/pyinfra/connectors/ssh.py:61-100
connector_data_meta: dict[ str , DataMeta] = {
"ssh_hostname" : DataMeta( "SSH hostname" ),
"ssh_port" : DataMeta( "SSH port" ),
"ssh_user" : DataMeta( "SSH user" ),
"ssh_password" : DataMeta( "SSH password" ),
"ssh_key" : DataMeta( "SSH key filename" ),
"ssh_allow_agent" : DataMeta(
"Whether to use any active SSH agent" ,
True ,
),
"ssh_look_for_keys" : DataMeta(
"Whether to look for private keys" ,
True ,
),
}
Command Execution
Connectors execute commands through run_shell_command:
# Signature
def run_shell_command (
self ,
command : StringCommand,
print_output : bool ,
print_input : bool ,
** arguments : ConnectorArguments,
) -> tuple[ bool , CommandOutput]:
pass
ConnectorArguments
From src/pyinfra/api/arguments.py:
class ConnectorArguments ( TypedDict , total = False ):
_sudo: bool
_sudo_user: str
_su_user: str
_preserve_sudo_env: bool
_use_sudo_password: bool
_use_su_login: bool
_env: dict[ str , str ]
_timeout: int
_shell_executable: str
Example Execution
from pyinfra.api import StringCommand
command = StringCommand( "apt-get" , "update" )
success, output = host.connector.run_shell_command(
command,
print_output = True ,
print_input = True ,
_sudo = True ,
_timeout = 30 ,
)
if success:
print ( f "stdout: { output.stdout_lines } " )
else :
print ( f "stderr: { output.stderr_lines } " )
File Transfer
Upload Files
# Put file
success = host.connector.put_file(
filename_or_io = "/local/file.txt" ,
remote_filename = "/remote/file.txt" ,
print_output = True ,
_sudo = True ,
)
# Upload IO object
from io import StringIO
file_io = StringIO( "Hello, World!" )
success = host.connector.put_file(
filename_or_io = file_io,
remote_filename = "/tmp/hello.txt" ,
)
Download Files
# Get file
success = host.connector.get_file(
remote_filename = "/remote/config.json" ,
filename_or_io = "/local/config.json" ,
print_output = True ,
)
# Download to IO object
from io import BytesIO
file_io = BytesIO()
success = host.connector.get_file(
remote_filename = "/etc/hosts" ,
filename_or_io = file_io,
)
if success:
content = file_io.getvalue()
Connection Management
Connecting
from pyinfra.api import Inventory, State, Config
inventory = Inventory(([ "web1.example.com" ], {}))
state = State()
state.init(inventory, config)
for host in inventory:
# Connect explicitly
try :
host.connect( reason = "for deployment" , raise_exceptions = True )
print ( f "Connected to { host.name } " )
except ConnectError as e:
print ( f "Failed to connect: { e } " )
Disconnecting
for host in inventory:
if host.connected:
host.disconnect()
print ( f "Disconnected from { host.name } " )
Connection State
if host.connected:
print ( "Host is connected" )
else :
print ( "Host is not connected" )
Creating Custom Connectors
Basic Custom Connector
from pyinfra.connectors.base import BaseConnector
from pyinfra.api import StringCommand
from pyinfra.connectors.util import CommandOutput
from typing import Iterator, Tuple
class MyCustomConnector ( BaseConnector ):
handles_execution = True
@ staticmethod
def make_names_data ( name : str ) -> Iterator[Tuple[ str , dict , list[ str ]]]:
"""
Generate host entries from connector string.
"""
# Parse name and yield (host_name, host_data, groups)
yield f "@mycustom/ { name } " , { "custom_param" : name}, []
def connect ( self ) -> None :
"""
Establish connection.
"""
print ( f "Connecting to { self .host.name } ..." )
# Connection logic here
self .host.connected = True
def disconnect ( self ) -> None :
"""
Close connection.
"""
print ( f "Disconnecting from { self .host.name } ..." )
# Cleanup logic here
self .host.connected = False
def run_shell_command (
self ,
command : StringCommand,
print_output : bool ,
print_input : bool ,
** arguments ,
) -> Tuple[ bool , CommandOutput]:
"""
Execute command.
"""
if print_input:
print ( f "$ { command } " )
# Execute command using your protocol
stdout_lines = []
stderr_lines = []
status = True
# ... execution logic ...
output = CommandOutput(
stdout_lines = [line for line in stdout_lines],
stderr_lines = [line for line in stderr_lines],
)
return status, output
def put_file (
self ,
filename_or_io ,
remote_filename ,
remote_temp_filename = None ,
print_output = False ,
print_input = False ,
** arguments ,
) -> bool :
"""
Upload file.
"""
# Upload logic here
return True
def get_file (
self ,
remote_filename ,
filename_or_io ,
remote_temp_filename = None ,
print_output = False ,
print_input = False ,
** arguments ,
) -> bool :
"""
Download file.
"""
# Download logic here
return True
Registering Custom Connector
from pyinfra.api.connectors import register_connector
register_connector( "mycustom" , MyCustomConnector)
# Now you can use it
inventory = Inventory(
([ "@mycustom/target" ], {}),
)
Connector Utilities
CommandOutput
# From src/pyinfra/connectors/util.py
from pyinfra.connectors.util import CommandOutput, OutputLine
output = CommandOutput([
OutputLine( "stdout" , "Line 1" ),
OutputLine( "stderr" , "Error message" ),
OutputLine( "stdout" , "Line 2" ),
])
# Access lines
stdout_lines = output.stdout_lines # ["Line 1", "Line 2"]
stderr_lines = output.stderr_lines # ["Error message"]
Command Helpers
from pyinfra.connectors.util import (
make_unix_command_for_host,
execute_command_with_sudo_retry,
)
# Build command with sudo/su
full_command = make_unix_command_for_host(
state,
host,
command,
_sudo = True ,
_sudo_user = "root" ,
_env = { "PATH" : "/usr/local/bin" },
)
Best Practices
Use SSH by Default SSH is secure, well-tested, and works for most use cases.
Local for Testing Use @local for testing deployments before running on real hosts.
Docker for Containers Use @docker connector for containerized applications.
Handle Errors Always handle connection errors and timeouts gracefully.
Reuse Connections Connectors maintain connections across operations for efficiency.
Custom When Needed Create custom connectors for proprietary or specialized systems.
Connection Troubleshooting
SSH Connection Issues
# Enable SSH debugging
inventory = Inventory(
(
[ "problem-host.example.com" ],
{
"ssh_user" : "deploy" ,
"ssh_port" : 22 ,
"ssh_connect_retries" : 3 ,
"ssh_connect_retry_min_delay" : 1.0 ,
"ssh_connect_retry_max_delay" : 5.0 ,
},
),
)
Timeout Configuration
# Increase timeouts for slow connections
from pyinfra.operations import server
server.shell(
name = "Long running command" ,
commands = [ "./slow-script.sh" ],
_timeout = 300 , # 5 minutes
)
Connection Retry
for host in inventory:
retries = 3
for attempt in range (retries):
try :
host.connect( raise_exceptions = True )
break
except ConnectError:
if attempt < retries - 1 :
time.sleep( 5 )
continue
raise
Advanced Examples
Multi-hop SSH
# Connect through bastion host
inventory = Inventory(
(
[( "internal-host" , {
"ssh_hostname" : "internal.local" ,
"ssh_user" : "deploy" ,
"ssh_paramiko_connect_kwargs" : {
"sock" : paramiko.ProxyCommand(
"ssh bastion.example.com -W %h:%p"
),
},
})],
{},
),
)
Inventory Generation
from pyinfra.connectors.base import BaseConnector
import requests
class CloudConnector ( BaseConnector ):
@ staticmethod
def make_names_data ( name : str ):
# Fetch instances from cloud API
response = requests.get( f "https://api.cloud.com/instances?tag= { name } " )
instances = response.json()
for instance in instances:
yield (
instance[ "public_ip" ],
{
"instance_id" : instance[ "id" ],
"instance_type" : instance[ "type" ],
},
[instance[ "environment" ]], # Groups
)
Inventory Learn how connectors integrate with inventory
Operations See how operations use connectors to execute commands
State Understand how State manages connector instances
Host Learn about the Host-Connector relationship