The server orchestration layer enables Flower ServerApps to coordinate federated learning across multiple data owners using SyftBox communication.
syftbox_flwr_server
Run a Flower ServerApp with SyftBox integration for coordinating FL training.
Function Signature
def syftbox_flwr_server (
server_app : ServerApp,
context : Context,
datasites : list[ str ],
app_name : str ,
project_dir : Optional[Path] = None ,
) -> Context
Source: src/syft_flwr/fl_orchestrator/flower_server.py:15
Parameters
server_app
flwr.server.ServerApp
required
The Flower ServerApp instance containing the aggregation strategy
context
flwr.common.Context
required
Flower context with run configuration and state
Name of the FL application (e.g., “diabetes_prediction”)
Path to the FL project directory (reads transport config from pyproject.toml)
Updated context after training completion
Workflow
Setup : Create SyftGrid instance with datasites and transport
Initialize : Set random run ID for the FL session
Train : Execute Flower server with SyftGrid backend
Cleanup : Send stop signals to all clients
Return : Updated context with final state
Example Usage
Basic Server
Custom Strategy
P2P Mode Server
from pathlib import Path
from flwr.server import ServerApp, ServerConfig
from flwr.server.strategy import FedAvg
from flwr.common import Context
from syft_flwr.fl_orchestrator import syftbox_flwr_server
# Define datasites (data owners)
datasites = [
"[email protected] " ,
"[email protected] " ,
"[email protected] "
]
# Create server app with FedAvg strategy
strategy = FedAvg(
fraction_fit = 1.0 , # Use all clients for training
fraction_evaluate = 1.0 , # Use all clients for evaluation
min_available_clients = 3 , # Wait for all clients
)
config = ServerConfig( num_rounds = 5 )
def server_fn ( context : Context):
return strategy, config
app = ServerApp( server_fn = server_fn)
# Run aggregation server
if __name__ == "__main__" :
context = Context( state = {}, run_config = {})
updated_context = syftbox_flwr_server(
server_app = app,
context = context,
datasites = datasites,
app_name = "diabetes_prediction" ,
project_dir = Path( "./diabetes_fl_project" )
)
print ( f "Training complete! Final context: { updated_context } " )
SyftGrid Integration
The server uses SyftGrid as the communication backend:
from syft_flwr.fl_orchestrator.syft_grid import SyftGrid
# Automatically created by syftbox_flwr_server
syft_grid = SyftGrid(
app_name = "flwr/diabetes_prediction" ,
datasites = datasites,
client = client # Auto-detected SyftFlwrClient
)
# Set run ID
run_id = 12345
syft_grid.set_run(run_id)
# SyftGrid handles all message routing
See SyftGrid for detailed API reference.
Stop Signal Behavior
After training completes (success or failure), the server sends stop signals to all clients:
# Sent to each datasite
{
"metadata" : {
"message_type" : "SYSTEM" ,
"group_id" : "final" ,
"dst_node_id" : < client_node_id >
},
"content" : {
"config" : {
"action" : "stop" ,
"reason" : "Server stopped"
}
}
}
Clients will gracefully shut down upon receiving this signal.
Error Handling
The server handles errors and still sends stop signals:
try :
updated_context = run_server(
syft_grid,
context = context,
loaded_server_app = server_app,
server_app_dir = ""
)
logger.info( f "Server completed with context: { updated_context } " )
except Exception as e:
logger.error( f "Server encountered an error: { str (e) } " )
logger.error( f "Traceback: { traceback.format_exc() } " )
updated_context = context # Return original context
finally :
syft_grid.send_stop_signal( group_id = "final" , reason = "Server stopped" )
logger.info( "Sending stop signals to the clients" )
Transport Support
SyftBox Mode:
Full RPC/crypto stack with optional encryption
Real-time message delivery via filesystem watching
Futures database for response tracking
P2P Mode:
File-based messaging via Google Drive API
Polling-based message retrieval
No encryption (access control via Drive permissions)
Environment Variables
Email address for P2P mode (required when using P2P transport)
SYFT_FLWR_ENCRYPTION_ENABLED
Enable/disable encryption in SyftBox mode (“true” or “false”)
Maximum time (seconds) to wait for client responses
Polling interval (seconds) for checking client responses
Complete Example
Full working example with custom strategy and metrics:
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
from flwr.server import ServerApp, ServerConfig, ClientManager
from flwr.server.strategy import FedAvg
from flwr.common import (
Context,
Parameters,
FitRes,
EvaluateRes,
Scalar,
parameters_to_ndarrays,
ndarrays_to_parameters
)
from syft_flwr.fl_orchestrator import syftbox_flwr_server
class DiabetesFedAvg ( FedAvg ):
"""Custom FedAvg with metrics aggregation for diabetes prediction."""
def aggregate_fit (
self ,
server_round : int ,
results : List[Tuple[ClientProxy, FitRes]],
failures : List[ BaseException ]
) -> Tuple[Optional[Parameters], Dict[ str , Scalar]]:
"""Aggregate model updates and log statistics."""
# Call parent aggregation
parameters_aggregated, metrics_aggregated = super ().aggregate_fit(
server_round, results, failures
)
# Log round statistics
if results:
total_examples = sum ([r.num_examples for _, r in results])
print ( f " \n === Round { server_round } ===" )
print ( f "Clients participated: { len (results) } " )
print ( f "Total training examples: { total_examples } " )
print ( f "Failures: { len (failures) } " )
metrics_aggregated[ "num_clients" ] = len (results)
metrics_aggregated[ "total_examples" ] = total_examples
return parameters_aggregated, metrics_aggregated
def aggregate_evaluate (
self ,
server_round : int ,
results : List[Tuple[ClientProxy, EvaluateRes]],
failures : List[ BaseException ]
) -> Tuple[Optional[ float ], Dict[ str , Scalar]]:
"""Aggregate evaluation results."""
if not results:
return None , {}
# Weighted average of losses
total_examples = sum ([r.num_examples for _, r in results])
weighted_losses = [
r.loss * r.num_examples for _, r in results
]
avg_loss = sum (weighted_losses) / total_examples
# Aggregate accuracy metrics
accuracies = [
r.metrics.get( "accuracy" , 0.0 ) * r.num_examples
for _, r in results
]
avg_accuracy = sum (accuracies) / total_examples
print ( f " \n === Evaluation Round { server_round } ===" )
print ( f "Average loss: { avg_loss :.4f} " )
print ( f "Average accuracy: { avg_accuracy :.4f} " )
metrics = {
"accuracy" : avg_accuracy,
"num_clients_evaluated" : len (results)
}
return avg_loss, metrics
# Configuration
datasites = [
"[email protected] " ,
"[email protected] " ,
"[email protected] "
]
strategy = DiabetesFedAvg(
fraction_fit = 1.0 ,
fraction_evaluate = 1.0 ,
min_fit_clients = 3 ,
min_evaluate_clients = 3 ,
min_available_clients = 3
)
config = ServerConfig( num_rounds = 10 )
def server_fn ( context : Context):
return strategy, config
app = ServerApp( server_fn = server_fn)
if __name__ == "__main__" :
print ( "Starting Diabetes Prediction FL Server" )
print ( f "Datasites: { datasites } " )
context = Context( state = {}, run_config = {})
final_context = syftbox_flwr_server(
server_app = app,
context = context,
datasites = datasites,
app_name = "diabetes_prediction" ,
project_dir = Path( "./diabetes_fl_project" )
)
print ( " \n === Training Complete ===" )
print ( f "Final context state: { final_context.state } " )
Run ID Assignment
The server assigns a random run ID for each FL session:
from random import randint
run_id = randint( 0 , 1000 )
syft_grid.set_run(run_id)
In production deployments, you may want to use a more sophisticated run ID generation scheme (e.g., timestamp-based, UUID, or database-assigned IDs).
See Also