Skip to main content
The event system provides abstractions for handling asynchronous FL messages across different transport layers.

SyftFlwrEvents Protocol

Base protocol defining the event handling interface.

Interface Definition

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, Optional, Union

MessageHandler = Callable[[bytes], Optional[Union[str, bytes]]]

class SyftFlwrEvents(ABC):
    """Protocol for syft-flwr event handling implementations.
    
    This abstraction allows syft-flwr to work with different
    transport/event mechanisms:
    - syft_core: SyftEvents with watchdog file watching + syft_rpc
    - syft_client: P2P File-based polling via Google Drive sync
    """
Source: src/syft_flwr/events/protocol.py:11

Properties

client_email

@property
@abstractmethod
def client_email(self) -> str
Get the email address of the current user.

app_dir

@property
@abstractmethod
def app_dir(self) -> Path
Get the path to the app data directory.

is_running

@property
@abstractmethod
def is_running(self) -> bool
Check if the event loop is currently running.

Methods

on_request

@abstractmethod
def on_request(
    self,
    endpoint: str,
    handler: MessageHandler,
    auto_decrypt: bool = True,
    encrypt_reply: bool = False,
) -> None
Register a handler for incoming messages at an endpoint.
endpoint
str
required
The endpoint path (e.g., “/messages”)
handler
MessageHandler
required
Function that receives message bytes and returns optional response
auto_decrypt
bool
default:true
Whether to auto-decrypt encrypted messages (SyftBox only)
encrypt_reply
bool
default:false
Whether to encrypt reply messages (SyftBox only)

run_forever

@abstractmethod
def run_forever(self) -> None
Start the main event loop and block until stopped.

stop

@abstractmethod
def stop(self) -> None
Signal the event loop to stop.

Factory Function

create_events_watcher

def create_events_watcher(
    app_name: str,
    client: Optional[Union[Client, SyftFlwrClient]] = None,
    cleanup_expiry: str = "1d",
    cleanup_interval: str = "1d",
    poll_interval: float = 2.0,
) -> SyftFlwrEvents
Create the appropriate events adapter based on client type. Source: src/syft_flwr/events/factory.py:17
app_name
str
required
Name of the FL app (e.g., “flwr/my_app”)
client
Optional[Union[Client, SyftFlwrClient]]
default:"None"
SyftFlwrClient or syft_core.Client instance (auto-created if None)
cleanup_expiry
str
default:"1d"
Expiry time for cleanup - SyftEvents only (e.g., “1d”, “12h”)
cleanup_interval
str
default:"1d"
Cleanup interval - SyftEvents only (e.g., “1d”, “6h”)
poll_interval
float
Polling interval in seconds - P2PFileEvents only
return
SyftFlwrEvents
Either SyftEvents (for SyftBox) or P2PFileEvents (for P2P mode)
Auto-detection logic:
  • If client.get_client() returns syft_core.ClientSyftEvents
  • If client.get_client() returns SyftP2PClientP2PFileEvents
Example:
from syft_flwr.events import create_events_watcher
from syft_flwr.client import create_client

# Auto-detect transport
client = create_client()
events = create_events_watcher(
    app_name="flwr/diabetes",
    client=client,
    poll_interval=2.0
)

print(type(events).__name__)  # "SyftEvents" or "P2PFileEvents"

SyftGrid

Server-side message orchestrator for coordinating FL communication.

Class Definition

from flwr.server.grid import Grid

class SyftGrid(Grid):
    """SyftGrid is the server-side message orchestrator for federated learning.
    
    This class abstracts the RPC layer to support both syft_core and syft_client
    environments. The appropriate RPC adapter is auto-detected based on the client.
    
    Supported configurations:
    - syft_core: Full syft_rpc/syft_crypto stack with optional encryption
    - syft_client: P2P File-based RPC via Google Drive (no encryption)
    """
Source: src/syft_flwr/fl_orchestrator/syft_grid.py:38

Constructor

app_name
str
required
Name of the FL application (e.g., “flwr/diabetes”)
datasites
Optional[list[str]]
default:"None"
List of data owner email addresses
client
Optional[Union[Client, SyftFlwrClient]]
default:"None"
SyftFlwrClient or syft_core.Client (auto-created if None)
rpc
Optional[SyftFlwrRpc]
default:"None"
RPC adapter for message transport (auto-created if None)
Example:
from syft_flwr.fl_orchestrator.syft_grid import SyftGrid

grid = SyftGrid(
    app_name="flwr/diabetes",
    datasites=[
        "[email protected]",
        "[email protected]"
    ]
)

grid.set_run(run_id=12345)
print(grid.get_node_ids())  # [<node_id_1>, <node_id_2>]

Properties

run

@property
def run(self) -> Run
Get the current Flower Run object with metadata.
return
flwr.common.typing.Run
Copy of the current Run object

Methods

set_run

def set_run(self, run_id: int) -> None
Set the run ID for this federated learning session.
run_id
int
required
Unique identifier for the FL run/session
Example:
from random import randint

run_id = randint(0, 1000)
grid.set_run(run_id)
print(grid.run.run_id)  # 742

get_client_email

def get_client_email(self) -> str
Get the email address of the server’s client.
return
str
Email address as a string

get_node_ids

def get_node_ids(self) -> list[int]
Get node IDs of all connected FL clients.
return
list[int]
List of integer node IDs (deterministically generated from emails)
Example:
grid = SyftGrid(
    app_name="flwr/app",
    datasites=["[email protected]", "[email protected]"]
)

node_ids = grid.get_node_ids()
print(node_ids)  # [2891336453, 3924203289]

create_message

def create_message(
    self,
    content: RecordDict,
    message_type: str,
    dst_node_id: int,
    group_id: str,
    ttl: Optional[float] = None,
) -> Message
Create a new Flower message with proper metadata.
content
RecordDict
required
Message payload (e.g., model parameters, metrics)
message_type
str
required
Type of FL message (e.g., MessageType.TRAIN, MessageType.EVALUATE)
dst_node_id
int
required
Destination node ID (client identifier)
group_id
str
required
Message group identifier for related messages
ttl
Optional[float]
default:"None"
Time-to-live in seconds for message expiration
return
flwr.common.message.Message
Flower Message ready to be sent
Example:
from flwr.common.constant import MessageType
from flwr.common.record import RecordDict, ParametersRecord

message = grid.create_message(
    content=RecordDict({
        "parameters": ParametersRecord(...),
        "config": ConfigRecord({"epochs": 5})
    }),
    message_type=MessageType.TRAIN,
    dst_node_id=2891336453,
    group_id="round_1",
    ttl=60.0
)

push_messages

def push_messages(self, messages: Iterable[Message]) -> Iterable[str]
Push FL messages to specified clients asynchronously.
messages
Iterable[Message]
required
Iterable of Flower Messages to send to clients
return
Iterable[str]
List of future IDs for retrieving responses
Example:
from flwr.common.constant import MessageType

messages = [
    grid.create_message(
        content=RecordDict({...}),
        message_type=MessageType.TRAIN,
        dst_node_id=node_id,
        group_id="round_1",
        ttl=120.0
    )
    for node_id in grid.get_node_ids()
]

future_ids = grid.push_messages(messages)
print(f"Sent {len(future_ids)} messages")

pull_messages

def pull_messages(
    self,
    message_ids: List[str]
) -> Tuple[Dict[str, Message], set]
Pull response messages from clients using future IDs.
message_ids
List[str]
required
List of future IDs from push_messages()
return
Tuple[Dict[str, Message], set]
Tuple of:
  • Dict mapping message_id to Message response
  • Set of completed message_ids (got response or failed)
Example:
future_ids = grid.push_messages(messages)

# Poll for responses
responses, completed = grid.pull_messages(list(future_ids))

for msg_id, response in responses.items():
    print(f"Got response for {msg_id}")
    if response.has_error():
        print(f"Error: {response.error.reason}")

print(f"Completed: {len(completed)}/{len(future_ids)}")

send_and_receive

def send_and_receive(
    self,
    messages: Iterable[Message],
    *,
    timeout: Optional[float] = None,
) -> Iterable[Message]
Push messages and wait for all replies (blocking).
messages
Iterable[Message]
required
Messages to send
timeout
Optional[float]
default:"120.0"
Maximum time to wait for replies (seconds). Can be overridden by SYFT_FLWR_MSG_TIMEOUT env var
return
Iterable[Message]
Collection of reply messages received
Example:
import os

# Set timeout via environment
os.environ["SYFT_FLWR_MSG_TIMEOUT"] = "180"  # 3 minutes

messages = [create_train_message(node_id) for node_id in grid.get_node_ids()]

# Send and wait for all responses
responses = grid.send_and_receive(messages)

print(f"Received {len(responses)} responses")
for response in responses:
    if response.has_error():
        print(f"Client error: {response.error.reason}")

send_stop_signal

def send_stop_signal(
    self,
    group_id: str,
    reason: str = "Training complete",
    ttl: float = 60.0
) -> List[Message]
Send a stop signal to all connected FL clients.
group_id
str
required
Identifier for this group of stop messages
reason
str
default:"Training complete"
Human-readable reason for stopping
ttl
float
Time-to-live for stop messages in seconds
return
List[Message]
List of stop Messages that were sent
Example:
# After training completes
try:
    # ... FL training ...
    pass
finally:
    grid.send_stop_signal(
        group_id="final",
        reason="Training complete - 10 rounds finished"
    )
    print("Sent stop signals to all clients")

Environment Variables

SYFT_FLWR_MSG_TIMEOUT
float
default:"120.0"
Maximum time (seconds) to wait for client responses in send_and_receive()
SYFT_FLWR_POLL_INTERVAL
float
default:"3.0"
Polling interval (seconds) for checking client responses
SYFT_FLWR_ENCRYPTION_ENABLED
str
default:"true"
Enable/disable encryption in SyftBox mode (“true” or “false”)

Encryption Behavior

SyftBox Mode (encryption enabled):
# Message flow
Flower Message → serialize → base64 encode → syft_rpc.send(encrypt=True)
X3DH encryption → filesystem → client decrypts → process
SyftBox Mode (encryption disabled):
# Message flow
Flower Message → serialize → syft_rpc.send(encrypt=False)
→ filesystem → client receives plaintext → process
P2P Mode:
# Message flow
Flower Message → serialize → write .request file to Google Drive
→ client polls Drive → reads .request → process
→ writes .response to Drive → server polls and reads

Complete Example

Full example showing event handling and message coordination:
from syft_flwr.fl_orchestrator.syft_grid import SyftGrid
from flwr.common import Context
from flwr.common.constant import MessageType
from flwr.common.record import RecordDict
import time

# Create SyftGrid
grid = SyftGrid(
    app_name="flwr/example",
    datasites=[
        "[email protected]",
        "[email protected]",
        "[email protected]"
    ]
)

# Set run ID
grid.set_run(run_id=42)

# Create training messages
node_ids = grid.get_node_ids()
messages = [
    grid.create_message(
        content=RecordDict({"round": 1}),
        message_type=MessageType.TRAIN,
        dst_node_id=node_id,
        group_id="training_round_1",
        ttl=120.0
    )
    for node_id in node_ids
]

# Send messages
print("Sending training messages...")
future_ids = list(grid.push_messages(messages))
print(f"Sent {len(future_ids)} messages")

# Poll for responses
print("Waiting for responses...")
start_time = time.time()
pending_ids = set(future_ids)
responses = {}

while pending_ids and (time.time() - start_time) < 120:
    batch, completed = grid.pull_messages(list(pending_ids))
    responses.update(batch)
    pending_ids -= completed
    
    if pending_ids:
        print(f"Still waiting for {len(pending_ids)} responses...")
        time.sleep(3)

print(f"\nReceived {len(responses)} responses")

# Process responses
for msg_id, response in responses.items():
    if response.has_error():
        print(f"Error in {msg_id}: {response.error.reason}")
    else:
        print(f"Success: {msg_id}")

# Send stop signals
print("\nSending stop signals...")
grid.send_stop_signal(group_id="final", reason="Example complete")
print("Done!")

See Also

Build docs developers (and LLMs) love