Skip to main content
The Duchy Protocol APIs enable secure communication and coordination between duchies during multi-party computation (MPC) execution for privacy-preserving measurements.
These are internal system APIs used exclusively for duchy-to-duchy communication. They are not accessible to measurement consumers or data providers.

Overview

Duchies collaborate to execute MPC protocols that compute aggregate metrics without revealing individual user data. The duchy protocol APIs provide:
  • Encrypted data exchange - Duchies pass encrypted sketches through computation stages
  • Work claiming - Duchies claim computation work from the shared work queue
  • Stage coordination - Track progress through multi-stage protocols
  • Participant management - Coordinate which duchies participate in each computation

MPC Protocol Architecture

Key Services

Computations Service

Manages computation lifecycle from duchy perspective:
  • GetComputation - Retrieve computation details
  • StreamActiveComputations - Long-lived stream of active computations
  • SetComputationResult - Submit final encrypted result

Computation Control Service

Coordinates computation advancement between duchies:
  • AdvanceComputation - Send encrypted data to next duchy in sequence
  • GetComputationStage - Query current computation stage
See Computation Control Service for detailed documentation.

Computation Participants Service

Manages duchy participation in computations:
  • Register duchy as participant
  • Confirm readiness for computation
  • Update participant state

Computation Streaming

Duchies use long-lived streams to monitor for new work:

StreamActiveComputations

continuation_token
string
Token indicating where to resume streamingUsed for fault tolerance and resuming after disconnection.
computation
Computation
An active computation resourceComputations may appear multiple times if updated during stream lifetime.
continuation_token
string
Token for subsequent requests to resume streamShould be persisted by duchies for crash recovery.
Example:
import grpc
from wfa.measurement.system.v1alpha import computations_service_pb2
from wfa.measurement.system.v1alpha import computations_service_pb2_grpc

def stream_active_computations(stub, continuation_token=None):
    """
    Stream active computations for this duchy.
    
    This is a long-lived stream that yields computations as they
    become available or are updated.
    """
    request = computations_service_pb2.StreamActiveComputationsRequest(
        continuation_token=continuation_token or ""
    )
    
    try:
        for response in stub.StreamActiveComputations(request):
            computation = response.computation
            continuation_token = response.continuation_token
            
            # Persist token for crash recovery
            save_continuation_token(continuation_token)
            
            # Process computation
            print(f"Received computation: {computation.name}")
            print(f"State: {computation.state}")
            
            # Claim and process work
            if should_process_computation(computation):
                process_computation_work(computation)
                
    except grpc.RpcError as e:
        print(f"Stream error: {e.code()} - {e.details()}")
        # Reconnect with last known continuation token
        stream_active_computations(stub, continuation_token)

Computation States

state
enum
Current state of the computationValues:
  • PENDING_REQUISITION_PARAMS - Awaiting duchy parameters
  • PENDING_REQUISITION_FULFILLMENT - Awaiting data provider sketches
  • PENDING_PARTICIPANT_CONFIRMATION - Duchies confirming participation
  • PENDING_COMPUTATION - MPC protocol execution in progress
  • SUCCEEDED - Computation completed successfully (terminal)
  • FAILED - Computation failed (terminal)
  • CANCELLED - Cancelled by measurement consumer (terminal)

Protocol Execution Flow

Liquid Legions V2 (3 Duchies)

Honest Majority Share Shuffle (3 Duchies)

Setting Computation Result

The aggregator duchy submits the final result:

SetComputationResult

name
string
required
Resource name of the computationFormat: computations/{computation}
aggregator_certificate
string
required
Certificate resource name of the aggregator duchyFormat: duchies/{duchy}/certificates/{certificate}Used to verify the signed result.
result_public_key
bytes
required
Serialized encryption public key from measurement consumerThe result is encrypted with this key.
encrypted_result
bytes
required
Encrypted and signed Result messageContains:
  • Encrypted metric value (reach, frequency histogram, etc.)
  • Signature from aggregator duchy
  • Metadata about computation
public_api_version
string
required
Version of the public API for message serializationExample: "v2alpha"
Example:
def submit_computation_result(
    stub,
    computation_name: str,
    aggregator_cert_name: str,
    encrypted_result: bytes,
    result_public_key: bytes
):
    """
    Submit final encrypted result for a computation.
    
    Args:
        stub: Computations service gRPC stub
        computation_name: Computation resource name
        aggregator_cert_name: Aggregator duchy certificate name
        encrypted_result: Encrypted and signed result bytes
        result_public_key: Measurement consumer's public key
    """
    request = computations_service_pb2.SetComputationResultRequest(
        name=computation_name,
        aggregator_certificate=aggregator_cert_name,
        result_public_key=result_public_key,
        encrypted_result=encrypted_result,
        public_api_version="v2alpha"
    )
    
    computation = stub.SetComputationResult(request)
    print(f"Result submitted for: {computation.name}")
    print(f"New state: {computation.state}")
    return computation

MPC Protocol Configuration

Each computation includes protocol-specific configuration:

Liquid Legions V2 Config

mpc_protocol_config.liquid_legions_v2
LiquidLegionsV2
Configuration for Liquid Legions V2 protocolFields:
  • sketch_params.decay_rate - Sketch decay rate (e.g., 12.0)
  • sketch_params.max_size - Maximum sketch size (e.g., 100000)
  • mpc_noise.blinded_histogram_noise - DP params for histogram noise
  • mpc_noise.publisher_noise - DP params for publisher noise
  • elliptic_curve_id - OpenSSL curve ID (e.g., 415 for prime256v1)
  • noise_mechanism - GEOMETRIC, DISCRETE_GAUSSIAN, or CONTINUOUS_GAUSSIAN

Honest Majority Share Shuffle Config

mpc_protocol_config.honest_majority_share_shuffle
HonestMajorityShareShuffle
Configuration for HMSS protocolFields:
  • reach_and_frequency_ring_modulus - Modulus for R&F (e.g., 2^32)
  • reach_ring_modulus - Modulus for reach-only (e.g., 2^16)
  • noise_mechanism - Noise generation method

Computation Participants

Each computation has multiple duchy participants:
computation_participants
ComputationParticipant[]
Denormalized list of participating duchiesEach participant includes:
  • name - Participant resource name
  • duchy_id - Duchy identifier
  • state - Participant-specific state
  • requisitions - Requisitions assigned to this duchy

Requisition Assignment

Requisitions are assigned to duchy participants:
def get_assigned_requisitions(computation, duchy_id):
    """
    Get requisitions assigned to a specific duchy.
    
    Args:
        computation: Computation message
        duchy_id: Identifier of the duchy
    
    Returns:
        List of Requisition messages
    """
    assigned_requisitions = []
    
    for requisition in computation.requisitions:
        # Extract duchy from fulfilling_computation_participant
        if duchy_id in requisition.fulfilling_computation_participant:
            assigned_requisitions.append(requisition)
    
    return assigned_requisitions

Work Claiming Pattern

Duchies implement a work queue pattern:
class DuchyWorker:
    def __init__(self, duchy_id, stub):
        self.duchy_id = duchy_id
        self.stub = stub
        self.active_computations = {}
    
    def run(self):
        """Main worker loop."""
        continuation_token = self._load_continuation_token()
        
        for response in self.stub.StreamActiveComputations(
            computations_service_pb2.StreamActiveComputationsRequest(
                continuation_token=continuation_token
            )
        ):
            computation = response.computation
            continuation_token = response.continuation_token
            
            # Persist token
            self._save_continuation_token(continuation_token)
            
            # Check if this duchy should process
            if self._is_participant(computation):
                # Claim work if not already processing
                if computation.name not in self.active_computations:
                    self._claim_computation(computation)
                
                # Process computation stage
                self._process_computation_stage(computation)
    
    def _is_participant(self, computation):
        """Check if this duchy is a participant."""
        for participant in computation.computation_participants:
            if participant.duchy_id == self.duchy_id:
                return True
        return False
    
    def _claim_computation(self, computation):
        """Claim computation for processing."""
        self.active_computations[computation.name] = computation
        print(f"Claimed computation: {computation.name}")
    
    def _process_computation_stage(self, computation):
        """Process current stage of computation."""
        # Implementation depends on protocol and current stage
        pass

Security Considerations

All duchy-to-duchy communication must use mutual TLS with certificate verification. Never accept connections from untrusted duchies.
Before processing a computation, verify it’s in the expected state. Protocol violations could compromise security.
Verify that encrypted data received from other duchies uses correct encryption schemes and key versions.
Run computation workers in isolated processes or containers to prevent cross-computation information leakage.
Log all computation operations with duchy identifiers, computation IDs, stages, and timestamps for security auditing.
Implement rate limiting on streaming RPCs and data transfer to prevent resource exhaustion attacks.

Error Handling

INVALID_ARGUMENT
error
Invalid computation name or parametersResolution: Verify resource names and protocol configuration
NOT_FOUND
error
Computation not found or not visible to this duchyResolution: Verify duchy is a participant in the computation
FAILED_PRECONDITION
error
Computation not in correct stateCommon causes:
  • Trying to advance computation that’s not ready
  • Submitting result before all stages complete
Resolution: Check computation state and wait for state transition
ABORTED
error
Operation aborted due to concurrent modificationResolution: Retry operation with updated computation state
DEADLINE_EXCEEDED
error
Computation took too long to completeResolution: Increase computation timeout or optimize processing

Performance Optimization

Use a single StreamActiveComputations connection per duchy rather than multiple streams to reduce overhead.
Process multiple independent computations in parallel using worker pools to maximize throughput.
When streaming data via AdvanceComputation, use 1-4MB chunks to balance memory usage and network efficiency.
Persist continuation tokens frequently (after each computation update) to minimize reprocessing after crashes.
Cache intermediate computation results to avoid recomputation if a stage needs to be retried.

Computation Control

Detailed computation control service documentation

Requisition Fulfillment

How data providers fulfill requisitions

Build docs developers (and LLMs) love