The client orchestration layer enables Flower ClientApps to run on data owner machines with SyftBox integration, supporting both local and P2P transport modes.
syftbox_flwr_client
Run a Flower ClientApp with SyftBox integration.
Function Signature
def syftbox_flwr_client (
client_app : ClientApp,
context : Context,
app_name : str ,
project_dir : Optional[Path] = None ,
) -> None
Source: src/syft_flwr/fl_orchestrator/flower_client.py:162
Parameters
client_app
flwr.client.ClientApp
required
The Flower ClientApp instance containing the FL training logic
context
flwr.common.Context
required
Flower context with run configuration and state
Name of the FL application (e.g., “diabetes_prediction”)
Path to the FL project directory (reads transport config from pyproject.toml)
Transport Support
Automatically detects and configures the appropriate transport:
SyftBox mode : Uses SyftEvents with watchdog file monitoring
P2P mode : Uses P2PFileEvents with Google Drive polling
Transport is determined from:
pyproject.toml [tool.syft_flwr] config
Environment variables
Auto-detection based on available clients
Message Flow
Receive : Events adapter monitors for incoming messages at /messages endpoint
Decrypt : Optional base64 decoding if encryption enabled (SyftBox only)
Deserialize : Convert bytes to Flower Message using protocol buffers
Process : Pass message to ClientApp for training/evaluation
Serialize : Convert Flower Message response to bytes
Encrypt : Optional base64 encoding if encryption enabled (SyftBox only)
Reply : Send response back to aggregator
Example Usage
Basic Client
P2P Mode (Colab)
Custom Data Loading
from flwr.client import ClientApp, NumPyClient
from flwr.common import Context
from syft_flwr.fl_orchestrator import syftbox_flwr_client
import numpy as np
class MyClient ( NumPyClient ):
def fit ( self , parameters , config ):
# Train local model
weights = [np.array(p) for p in parameters]
# ... training logic ...
return weights, 100 , {} # weights, num_examples, metrics
def evaluate ( self , parameters , config ):
# Evaluate local model
weights = [np.array(p) for p in parameters]
# ... evaluation logic ...
return 0.95 , 100 , { "accuracy" : 0.95 } # loss, num_examples, metrics
def client_fn ( context : Context):
return MyClient().to_client()
app = ClientApp( client_fn = client_fn)
# Run with SyftBox
if __name__ == "__main__" :
from pathlib import Path
context = Context( state = {}, run_config = {})
syftbox_flwr_client(
client_app = app,
context = context,
app_name = "diabetes_prediction" ,
project_dir = Path( "./my_fl_project" )
)
Stop Signal Handling
The client automatically handles stop signals from the server:
# Server sends stop signal
{
"metadata" : {
"message_type" : "SYSTEM" ,
"group_id" : "final"
},
"content" : {
"config" : {
"action" : "stop" ,
"reason" : "Training complete"
}
}
}
When received, the client:
Logs the stop signal
Stops the events loop
Exits gracefully
Error Handling
The client creates error reply messages for:
Deserialization failures : Invalid message format
Client exceptions : Errors during fit() or evaluate()
# Error reply format
{
"metadata" : {
"message_type" : "TRAIN" , # Original message type
"reply_to_message_id" : "original_message_id"
},
"error" : {
"code" : "CLIENT_APP_RAISED_EXCEPTION" ,
"reason" : "Client: '[email protected] '. Error: division by zero. Traceback: ..."
}
}
Encryption
SyftBox Mode:
Messages are base64 encoded/decoded when encryption is enabled
Set SYFT_FLWR_ENCRYPTION_ENABLED=false to disable
Encryption handled by syft_crypto (X3DH key exchange)
P2P Mode:
Encryption not yet supported
Access control via Google Drive permissions
Environment Variables
Email address for P2P mode (required for P2P transport)
Path to the SyftBox data directory containing local datasets
SYFT_FLWR_ENCRYPTION_ENABLED
Enable/disable encryption in SyftBox mode (“true” or “false”)
MessageHandler
Internal class for processing Flower messages and preparing replies.
Class Definition
class MessageHandler :
"""Handles message processing for Flower client."""
def __init__ (
self ,
client_app : ClientApp,
context : Context,
encryption_enabled : bool
)
Source: src/syft_flwr/fl_orchestrator/flower_client.py:19
Methods
process_message
def process_message ( self , message : Message) -> Union[ str , bytes ]
Process a normal Flower message and return the reply.
message
flwr.common.message.Message
required
The incoming Flower message (TRAIN, EVALUATE, etc.)
Reply message (base64 string if encrypted, bytes if plaintext)
create_error_reply
def create_error_reply (
self ,
message : Optional[Message],
error : Error
) -> Union[ str , bytes ]
Create an error reply message.
Original message (if available)
error
flwr.common.message.Error
required
Error details
RequestProcessor
Internal class for processing incoming requests with encryption/decryption.
Class Definition
class RequestProcessor :
"""Processes incoming requests and handles encryption/decryption."""
def __init__ (
self ,
message_handler : MessageHandler,
events : SyftFlwrEvents,
client_email : str
)
Source: src/syft_flwr/fl_orchestrator/flower_client.py:64
Methods
process
def process ( self , request_body : bytes ) -> Optional[Union[ str , bytes ]]
Process incoming request body and return response.
Raw message bytes from the events adapter
return
Optional[Union[str, bytes]]
Response message or None (for stop signals)
Processing Steps:
Decode base64 if encryption enabled
Deserialize to Flower Message
Check for stop signal
Process with ClientApp
Serialize response
Encode base64 if encryption enabled
Complete Example
Full working example with data loading and training:
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from flwr.client import ClientApp, NumPyClient
from flwr.common import Context
from syft_flwr.fl_orchestrator import syftbox_flwr_client
from syft_flwr.utils import get_syftbox_dataset_path
class DiabetesClient ( NumPyClient ):
def __init__ ( self ):
# Load data from SyftBox data directory
data_dir = get_syftbox_dataset_path()
self .df = pd.read_csv(data_dir / "diabetes.csv" )
# Prepare data
self .X = self .df.drop( "Outcome" , axis = 1 ).values
self .y = self .df[ "Outcome" ].values
# Initialize model
self .model = LogisticRegression( max_iter = 100 )
print ( f "Client initialized with { len ( self .df) } samples" )
def fit ( self , parameters , config ):
"""Train the model on local data."""
# Set model parameters from server
if parameters:
self .model.coef_ = np.array([parameters[ 0 ]])
self .model.intercept_ = np.array([parameters[ 1 ]])
# Train locally
self .model.fit( self .X, self .y)
# Return updated parameters
params = [ self .model.coef_.tolist(), self .model.intercept_.tolist()]
return params, len ( self .X), {}
def evaluate ( self , parameters , config ):
"""Evaluate the model on local data."""
# Set model parameters
self .model.coef_ = np.array([parameters[ 0 ]])
self .model.intercept_ = np.array([parameters[ 1 ]])
# Evaluate
predictions = self .model.predict( self .X)
accuracy = accuracy_score( self .y, predictions)
return 1.0 - accuracy, len ( self .X), { "accuracy" : accuracy}
def client_fn ( context : Context):
return DiabetesClient().to_client()
app = ClientApp( client_fn = client_fn)
if __name__ == "__main__" :
context = Context( state = {}, run_config = {})
syftbox_flwr_client(
client_app = app,
context = context,
app_name = "diabetes_prediction" ,
project_dir = Path( "./diabetes_fl_project" )
)
See Also