Manage principals (users and services) for authentication and authorization
The Principals Service (wfa.measurement.access.v1alpha.Principals) manages principal resources representing authenticated entities (users and TLS clients) in the system.
Deleting a principal automatically removes it from all policy bindings. This immediately revokes all access for this principal.
Example:
from google.protobuf import empty_pb2request = principals_service_pb2.DeletePrincipalRequest( name="principals/alice")principals_client.DeletePrincipal(request)print("Principal deleted and removed from all policies")
Error Codes:
PRINCIPAL_NOT_FOUND - Principal does not exist
PRINCIPAL_TYPE_NOT_SUPPORTED - Invalid principal type
def authenticate_oauth_user(id_token): """ Authenticate user from OAuth ID token. Args: id_token: JWT ID token from OAuth provider Returns: Principal resource """ import jwt # Verify and decode ID token decoded = jwt.decode( id_token, verify=True, audience="your-client-id" ) issuer = decoded['iss'] subject = decoded['sub'] # Lookup existing principal try: principal = principals_client.LookupPrincipal( principals_service_pb2.LookupPrincipalRequest( user=principals_service_pb2.Principal.OAuthUser( issuer=issuer, subject=subject ) ) ) print(f"Authenticated as: {principal.name}") return principal except grpc.RpcError as e: if "PRINCIPAL_NOT_FOUND_FOR_USER" in e.details(): # Auto-register new user print("First-time user, creating principal") return create_new_user_principal(issuer, subject, decoded.get('email')) raisedef create_new_user_principal(issuer, subject, email=None): """ Create principal for new user. """ # Generate principal ID from email or subject principal_id = email.split('@')[0] if email else f"user-{subject[:8]}" principal = principals_client.CreatePrincipal( principals_service_pb2.CreatePrincipalRequest( principal_id=principal_id, principal=principals_service_pb2.Principal( user=principals_service_pb2.Principal.OAuthUser( issuer=issuer, subject=subject ) ) ) ) # Grant default permissions grant_default_permissions(principal.name) return principal
def provision_new_user(email, oauth_issuer, oauth_subject, roles): """ Provision a new user with roles. Args: email: User email address oauth_issuer: OAuth issuer URL oauth_subject: OAuth subject ID roles: List of role names to grant """ # Create principal principal_id = email.split('@')[0] principal = principals_client.CreatePrincipal( principals_service_pb2.CreatePrincipalRequest( principal_id=principal_id, principal=principals_service_pb2.Principal( user=principals_service_pb2.Principal.OAuthUser( issuer=oauth_issuer, subject=oauth_subject ) ) ) ) print(f"Created principal: {principal.name}") # Grant roles via policy for role in roles: add_principal_to_role( resource="measurementConsumers/default", principal_name=principal.name, role_name=role ) print(f"Granted role: {role}") return principal# Usageprovision_new_user( email="[email protected]", oauth_issuer="https://accounts.google.com", oauth_subject="110169484474386276334", roles=["report-viewer", "metric-creator"])
def create_service_account(service_name, certificate_path, roles): """ Create a service account principal from certificate. Args: service_name: Name for the service certificate_path: Path to X.509 certificate file roles: List of role names to grant """ from cryptography import x509 from cryptography.x509.oid import ExtensionOID # Read certificate with open(certificate_path, 'rb') as f: cert = x509.load_pem_x509_certificate(f.read()) # Extract AKID aki_ext = cert.extensions.get_extension_for_oid( ExtensionOID.AUTHORITY_KEY_IDENTIFIER ) akid = aki_ext.value.key_identifier # Create principal principal_id = f"service-{service_name}" principal = principals_client.CreatePrincipal( principals_service_pb2.CreatePrincipalRequest( principal_id=principal_id, principal=principals_service_pb2.Principal( tls_client=principals_service_pb2.Principal.TlsClient( authority_key_identifier=akid ) ) ) ) print(f"Created service account: {principal.name}") print(f"AKID: {akid.hex()}") # Grant roles for role in roles: add_principal_to_role( resource="measurementConsumers/default", principal_name=principal.name, role_name=role ) return principal# Usagecreate_service_account( service_name="data-ingestion", certificate_path="/path/to/service.crt", roles=["data-provider", "requisition-fulfiller"])
def deprovision_user(principal_name): """ Remove user and revoke all access. Args: principal_name: Principal resource name """ # Get principal info for audit log principal = principals_client.GetPrincipal( principals_service_pb2.GetPrincipalRequest( name=principal_name ) ) print(f"Deprovisioning: {principal.name}") if principal.HasField('user'): print(f" OAuth User: {principal.user.issuer} / {principal.user.subject}") # Delete principal (automatically removes from all policies) principals_client.DeletePrincipal( principals_service_pb2.DeletePrincipalRequest( name=principal_name ) ) print(f"Deleted principal and revoked all access") # Audit log audit_log.info(f"Deprovisioned user: {principal_name}")# Usagedeprovision_user("principals/alice")