Skip to main content
The RPC protocol provides a unified interface for sending and receiving FL messages across different transport implementations.

SyftFlwrRpc Protocol

Base protocol defining the RPC interface for message transport.

Interface Definition

from abc import ABC, abstractmethod
from typing import Optional

class SyftFlwrRpc(ABC):
    """Protocol for syft-flwr RPC implementations.
    
    This abstraction allows syft-flwr to work with different
    transport mechanisms for sending FL messages:
    - syft_core: syft_rpc (using SyftBox Go Client) with futures database
    - syft_client: P2P File-based RPC via Google Drive sync
    """
Source: src/syft_flwr/rpc/protocol.py:5

Methods

send

@abstractmethod
def send(
    self,
    to_email: str,
    app_name: str,
    endpoint: str,
    body: bytes,
    encrypt: bool = False,
) -> str
Send a message to a recipient.
to_email
str
required
Recipient’s email address
app_name
str
required
Name of the FL application
endpoint
str
required
RPC endpoint (e.g., “messages”, “rpc/fit”)
body
bytes
required
Message body as bytes
encrypt
bool
default:false
Whether to encrypt the message (syft_core only)
return
str
Future ID for tracking the response

get_response

@abstractmethod
def get_response(self, future_id: str) -> Optional[bytes]
Get response for a future ID.
future_id
str
required
The ID returned by send()
return
Optional[bytes]
Response body as bytes, or None if not ready yet

delete_future

@abstractmethod
def delete_future(self, future_id: str) -> None
Delete a future after processing its response.
future_id
str
required
The ID to delete

Factory Function

create_rpc

def create_rpc(
    client: SyftFlwrClient,
    app_name: str,
) -> SyftFlwrRpc
Create the appropriate RPC adapter based on client type. Source: src/syft_flwr/rpc/factory.py:13
client
SyftFlwrClient
required
SyftFlwrClient instance (SyftCoreClient or SyftP2PClient)
app_name
str
required
Name of the FL application
return
SyftFlwrRpc
Either SyftRpc (for SyftCoreClient) or P2PFileRpc (for SyftP2PClient)
Auto-detection logic:
  • If client.get_client() returns syft_core.ClientSyftRpc
  • If client.get_client() returns SyftP2PClientP2PFileRpc
  • Otherwise → TypeError
Example:
from syft_flwr.client import create_client
from syft_flwr.rpc import create_rpc

# Auto-detect transport
client = create_client()
rpc = create_rpc(client=client, app_name="flwr/diabetes")

print(type(rpc).__name__)  # "SyftRpc" or "P2PFileRpc"

Implementation Comparison

Detailed comparison of RPC implementations:
FeatureSyftRpcP2PFileRpc
Transportsyft_rpc libraryGoogle Drive API
Message FormatURL-based routingFile-based (.request/.response)
Encryption✅ X3DH + AES-256-GCM❌ Not yet (Drive permissions)
Future StorageSQLite databaseIn-memory dict
Response TrackingPersistent across restartsLost on restart
LatencyLow (filesystem watching)Higher (polling-based)
DependenciesSyftBox Go clientGoogle Drive credentials

Message Flow Diagrams

SyftRpc Flow (SyftBox)

P2PFileRpc Flow (Google Drive)

Usage Patterns

Basic Send/Receive

from syft_flwr.client import create_client
from syft_flwr.rpc import create_rpc
import time

# Create RPC adapter
client = create_client()
rpc = create_rpc(client=client, app_name="flwr/example")

# Send message
future_id = rpc.send(
    to_email="[email protected]",
    app_name="flwr/example",
    endpoint="messages",
    body=b"Hello, world!",
    encrypt=True  # Only works with SyftRpc
)

print(f"Message sent, future_id: {future_id}")

# Poll for response
while True:
    response = rpc.get_response(future_id)
    if response is not None:
        print(f"Got response: {response.decode()}")
        rpc.delete_future(future_id)
        break
    time.sleep(1)

Batch Messaging

from syft_flwr.client import create_client
from syft_flwr.rpc import create_rpc
import time

client = create_client()
rpc = create_rpc(client=client, app_name="flwr/batch")

# Send to multiple recipients
recipients = [
    "[email protected]",
    "[email protected]",
    "[email protected]"
]

future_ids = []
for recipient in recipients:
    future_id = rpc.send(
        to_email=recipient,
        app_name="flwr/batch",
        endpoint="messages",
        body=b"Training round 1",
        encrypt=False
    )
    future_ids.append(future_id)

print(f"Sent {len(future_ids)} messages")

# Wait for all responses
responses = {}
start_time = time.time()
timeout = 60

while len(responses) < len(future_ids) and (time.time() - start_time) < timeout:
    for future_id in future_ids:
        if future_id not in responses:
            response = rpc.get_response(future_id)
            if response is not None:
                responses[future_id] = response
                print(f"Received response {len(responses)}/{len(future_ids)}")
    
    if len(responses) < len(future_ids):
        time.sleep(2)

print(f"Received {len(responses)} responses")

# Cleanup
for future_id in future_ids:
    rpc.delete_future(future_id)

Error Handling

from syft_flwr.rpc import create_rpc
from syft_flwr.client import create_client
import time

client = create_client()
rpc = create_rpc(client=client, app_name="flwr/robust")

try:
    # Send message
    future_id = rpc.send(
        to_email="[email protected]",
        app_name="flwr/robust",
        endpoint="messages",
        body=b"Important message",
        encrypt=True
    )
    
    # Wait with timeout
    timeout = 30
    start = time.time()
    response = None
    
    while response is None and (time.time() - start) < timeout:
        response = rpc.get_response(future_id)
        if response is None:
            time.sleep(1)
    
    if response is None:
        print(f"Timeout waiting for response after {timeout}s")
        # Handle timeout (retry, log, etc.)
    else:
        print(f"Success: {len(response)} bytes")
    
except Exception as e:
    print(f"Error: {e}")
    # Handle errors
    
finally:
    # Always cleanup
    try:
        rpc.delete_future(future_id)
    except:
        pass

Best Practices

Always Clean Up Futures

# Good
future_id = rpc.send(...)
try:
    response = rpc.get_response(future_id)
    process_response(response)
finally:
    rpc.delete_future(future_id)  # Always cleanup

# Bad - memory/database leak
future_id = rpc.send(...)
response = rpc.get_response(future_id)
process_response(response)
# Forgot to delete future!

Use Timeouts

import time

# Good - bounded wait time
start = time.time()
timeout = 60

while (time.time() - start) < timeout:
    response = rpc.get_response(future_id)
    if response is not None:
        break
    time.sleep(1)

if response is None:
    handle_timeout()

# Bad - infinite loop
while True:
    response = rpc.get_response(future_id)
    if response is not None:
        break
    time.sleep(1)

Check Encryption Compatibility

from syft_flwr.rpc import SyftRpc, P2PFileRpc

rpc = create_rpc(client, app_name)

# Check if encryption is available
if isinstance(rpc, SyftRpc):
    # SyftBox - encryption available
    rpc.send(
        to_email="[email protected]",
        app_name="flwr/app",
        endpoint="messages",
        body=b"data",
        encrypt=True  # ✅ Works
    )
elif isinstance(rpc, P2PFileRpc):
    # P2P - encryption not yet supported
    rpc.send(
        to_email="[email protected]",
        app_name="flwr/app",
        endpoint="messages",
        body=b"data",
        encrypt=False  # ⚠️ Must be False
    )

Performance Considerations

SyftRpc (SyftBox)

Strengths:
  • Low latency (watchdog file monitoring)
  • Persistent futures database
  • Efficient for high-frequency messaging
Considerations:
  • Requires local SyftBox installation
  • SQLite database I/O for futures
  • Encryption adds ~10ms per message

P2PFileRpc (Google Drive)

Strengths:
  • No local installation required
  • Works in Colab and cloud environments
  • Scalable storage
Considerations:
  • Higher latency due to polling (2s default)
  • Google Drive API rate limits
  • Network-dependent performance
  • In-memory futures (lost on restart)
Optimization tips:
# Reduce polling interval for faster responses (increases API calls)
events = create_events_watcher(
    app_name="flwr/app",
    client=client,
    poll_interval=1.0  # Poll every 1s instead of 2s
)

# Batch messages to reduce roundtrips
future_ids = [
    rpc.send(to_email=email, app_name=app, endpoint="messages", body=data)
    for email in recipients
]

See Also

Build docs developers (and LLMs) love