The event system provides abstractions for handling asynchronous FL messages across different transport layers.
SyftFlwrEvents Protocol
Base protocol defining the event handling interface.
Interface Definition
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, Optional, Union
MessageHandler = Callable[[bytes], Optional[Union[str, bytes]]]
class SyftFlwrEvents(ABC):
"""Protocol for syft-flwr event handling implementations.
This abstraction allows syft-flwr to work with different
transport/event mechanisms:
- syft_core: SyftEvents with watchdog file watching + syft_rpc
- syft_client: P2P File-based polling via Google Drive sync
"""
Source: src/syft_flwr/events/protocol.py:11
Properties
client_email
@property
@abstractmethod
def client_email(self) -> str
Get the email address of the current user.
app_dir
@property
@abstractmethod
def app_dir(self) -> Path
Get the path to the app data directory.
is_running
@property
@abstractmethod
def is_running(self) -> bool
Check if the event loop is currently running.
Methods
on_request
@abstractmethod
def on_request(
self,
endpoint: str,
handler: MessageHandler,
auto_decrypt: bool = True,
encrypt_reply: bool = False,
) -> None
Register a handler for incoming messages at an endpoint.
The endpoint path (e.g., “/messages”)
Function that receives message bytes and returns optional response
Whether to auto-decrypt encrypted messages (SyftBox only)
Whether to encrypt reply messages (SyftBox only)
run_forever
@abstractmethod
def run_forever(self) -> None
Start the main event loop and block until stopped.
stop
@abstractmethod
def stop(self) -> None
Signal the event loop to stop.
Factory Function
create_events_watcher
def create_events_watcher(
app_name: str,
client: Optional[Union[Client, SyftFlwrClient]] = None,
cleanup_expiry: str = "1d",
cleanup_interval: str = "1d",
poll_interval: float = 2.0,
) -> SyftFlwrEvents
Create the appropriate events adapter based on client type.
Source: src/syft_flwr/events/factory.py:17
Name of the FL app (e.g., “flwr/my_app”)
client
Optional[Union[Client, SyftFlwrClient]]
default:"None"
SyftFlwrClient or syft_core.Client instance (auto-created if None)
Expiry time for cleanup - SyftEvents only (e.g., “1d”, “12h”)
Cleanup interval - SyftEvents only (e.g., “1d”, “6h”)
Polling interval in seconds - P2PFileEvents only
Either SyftEvents (for SyftBox) or P2PFileEvents (for P2P mode)
Auto-detection logic:
- If
client.get_client() returns syft_core.Client → SyftEvents
- If
client.get_client() returns SyftP2PClient → P2PFileEvents
Example:
from syft_flwr.events import create_events_watcher
from syft_flwr.client import create_client
# Auto-detect transport
client = create_client()
events = create_events_watcher(
app_name="flwr/diabetes",
client=client,
poll_interval=2.0
)
print(type(events).__name__) # "SyftEvents" or "P2PFileEvents"
SyftGrid
Server-side message orchestrator for coordinating FL communication.
Class Definition
from flwr.server.grid import Grid
class SyftGrid(Grid):
"""SyftGrid is the server-side message orchestrator for federated learning.
This class abstracts the RPC layer to support both syft_core and syft_client
environments. The appropriate RPC adapter is auto-detected based on the client.
Supported configurations:
- syft_core: Full syft_rpc/syft_crypto stack with optional encryption
- syft_client: P2P File-based RPC via Google Drive (no encryption)
"""
Source: src/syft_flwr/fl_orchestrator/syft_grid.py:38
Constructor
Name of the FL application (e.g., “flwr/diabetes”)
datasites
Optional[list[str]]
default:"None"
List of data owner email addresses
client
Optional[Union[Client, SyftFlwrClient]]
default:"None"
SyftFlwrClient or syft_core.Client (auto-created if None)
rpc
Optional[SyftFlwrRpc]
default:"None"
RPC adapter for message transport (auto-created if None)
Example:
from syft_flwr.fl_orchestrator.syft_grid import SyftGrid
grid = SyftGrid(
app_name="flwr/diabetes",
datasites=[
"[email protected]",
"[email protected]"
]
)
grid.set_run(run_id=12345)
print(grid.get_node_ids()) # [<node_id_1>, <node_id_2>]
Properties
run
@property
def run(self) -> Run
Get the current Flower Run object with metadata.
Copy of the current Run object
Methods
set_run
def set_run(self, run_id: int) -> None
Set the run ID for this federated learning session.
Unique identifier for the FL run/session
Example:
from random import randint
run_id = randint(0, 1000)
grid.set_run(run_id)
print(grid.run.run_id) # 742
get_client_email
def get_client_email(self) -> str
Get the email address of the server’s client.
Email address as a string
get_node_ids
def get_node_ids(self) -> list[int]
Get node IDs of all connected FL clients.
List of integer node IDs (deterministically generated from emails)
Example:
create_message
def create_message(
self,
content: RecordDict,
message_type: str,
dst_node_id: int,
group_id: str,
ttl: Optional[float] = None,
) -> Message
Create a new Flower message with proper metadata.
Message payload (e.g., model parameters, metrics)
Type of FL message (e.g., MessageType.TRAIN, MessageType.EVALUATE)
Destination node ID (client identifier)
Message group identifier for related messages
ttl
Optional[float]
default:"None"
Time-to-live in seconds for message expiration
return
flwr.common.message.Message
Flower Message ready to be sent
Example:
from flwr.common.constant import MessageType
from flwr.common.record import RecordDict, ParametersRecord
message = grid.create_message(
content=RecordDict({
"parameters": ParametersRecord(...),
"config": ConfigRecord({"epochs": 5})
}),
message_type=MessageType.TRAIN,
dst_node_id=2891336453,
group_id="round_1",
ttl=60.0
)
push_messages
def push_messages(self, messages: Iterable[Message]) -> Iterable[str]
Push FL messages to specified clients asynchronously.
messages
Iterable[Message]
required
Iterable of Flower Messages to send to clients
List of future IDs for retrieving responses
Example:
from flwr.common.constant import MessageType
messages = [
grid.create_message(
content=RecordDict({...}),
message_type=MessageType.TRAIN,
dst_node_id=node_id,
group_id="round_1",
ttl=120.0
)
for node_id in grid.get_node_ids()
]
future_ids = grid.push_messages(messages)
print(f"Sent {len(future_ids)} messages")
pull_messages
def pull_messages(
self,
message_ids: List[str]
) -> Tuple[Dict[str, Message], set]
Pull response messages from clients using future IDs.
List of future IDs from push_messages()
return
Tuple[Dict[str, Message], set]
Tuple of:
- Dict mapping message_id to Message response
- Set of completed message_ids (got response or failed)
Example:
future_ids = grid.push_messages(messages)
# Poll for responses
responses, completed = grid.pull_messages(list(future_ids))
for msg_id, response in responses.items():
print(f"Got response for {msg_id}")
if response.has_error():
print(f"Error: {response.error.reason}")
print(f"Completed: {len(completed)}/{len(future_ids)}")
send_and_receive
def send_and_receive(
self,
messages: Iterable[Message],
*,
timeout: Optional[float] = None,
) -> Iterable[Message]
Push messages and wait for all replies (blocking).
messages
Iterable[Message]
required
Messages to send
timeout
Optional[float]
default:"120.0"
Maximum time to wait for replies (seconds). Can be overridden by SYFT_FLWR_MSG_TIMEOUT env var
Collection of reply messages received
Example:
import os
# Set timeout via environment
os.environ["SYFT_FLWR_MSG_TIMEOUT"] = "180" # 3 minutes
messages = [create_train_message(node_id) for node_id in grid.get_node_ids()]
# Send and wait for all responses
responses = grid.send_and_receive(messages)
print(f"Received {len(responses)} responses")
for response in responses:
if response.has_error():
print(f"Client error: {response.error.reason}")
send_stop_signal
def send_stop_signal(
self,
group_id: str,
reason: str = "Training complete",
ttl: float = 60.0
) -> List[Message]
Send a stop signal to all connected FL clients.
Identifier for this group of stop messages
reason
str
default:"Training complete"
Human-readable reason for stopping
Time-to-live for stop messages in seconds
List of stop Messages that were sent
Example:
# After training completes
try:
# ... FL training ...
pass
finally:
grid.send_stop_signal(
group_id="final",
reason="Training complete - 10 rounds finished"
)
print("Sent stop signals to all clients")
Environment Variables
Maximum time (seconds) to wait for client responses in send_and_receive()
Polling interval (seconds) for checking client responses
SYFT_FLWR_ENCRYPTION_ENABLED
Enable/disable encryption in SyftBox mode (“true” or “false”)
Encryption Behavior
SyftBox Mode (encryption enabled):
# Message flow
Flower Message → serialize → base64 encode → syft_rpc.send(encrypt=True)
→ X3DH encryption → filesystem → client decrypts → process
SyftBox Mode (encryption disabled):
# Message flow
Flower Message → serialize → syft_rpc.send(encrypt=False)
→ filesystem → client receives plaintext → process
P2P Mode:
# Message flow
Flower Message → serialize → write .request file to Google Drive
→ client polls Drive → reads .request → process
→ writes .response to Drive → server polls and reads
Complete Example
Full example showing event handling and message coordination:
from syft_flwr.fl_orchestrator.syft_grid import SyftGrid
from flwr.common import Context
from flwr.common.constant import MessageType
from flwr.common.record import RecordDict
import time
# Create SyftGrid
grid = SyftGrid(
app_name="flwr/example",
datasites=[
"[email protected]",
"[email protected]",
"[email protected]"
]
)
# Set run ID
grid.set_run(run_id=42)
# Create training messages
node_ids = grid.get_node_ids()
messages = [
grid.create_message(
content=RecordDict({"round": 1}),
message_type=MessageType.TRAIN,
dst_node_id=node_id,
group_id="training_round_1",
ttl=120.0
)
for node_id in node_ids
]
# Send messages
print("Sending training messages...")
future_ids = list(grid.push_messages(messages))
print(f"Sent {len(future_ids)} messages")
# Poll for responses
print("Waiting for responses...")
start_time = time.time()
pending_ids = set(future_ids)
responses = {}
while pending_ids and (time.time() - start_time) < 120:
batch, completed = grid.pull_messages(list(pending_ids))
responses.update(batch)
pending_ids -= completed
if pending_ids:
print(f"Still waiting for {len(pending_ids)} responses...")
time.sleep(3)
print(f"\nReceived {len(responses)} responses")
# Process responses
for msg_id, response in responses.items():
if response.has_error():
print(f"Error in {msg_id}: {response.error.reason}")
else:
print(f"Success: {msg_id}")
# Send stop signals
print("\nSending stop signals...")
grid.send_stop_signal(group_id="final", reason="Example complete")
print("Done!")
See Also