Name of the resource this policy protectsFormat: Any resource name (e.g., measurementConsumers/{id}/reports/{id})If not specified, this is the root policy for the entire API.Must be unique - Each protected resource can have at most one policy.
def setup_measurement_consumer_policy(mc_id, admin_users, viewer_users): """ Create a complete policy for a measurement consumer. Args: mc_id: Measurement consumer ID admin_users: List of admin principal names viewer_users: List of viewer principal names """ # Create policy with admin role policy = policies_client.CreatePolicy( policies_service_pb2.CreatePolicyRequest( policy_id=f"mc-{mc_id}-policy", policy=policies_service_pb2.Policy( protected_resource=f"measurementConsumers/{mc_id}", bindings=[ policies_service_pb2.Policy.Binding( role="roles/measurement-admin", members=[f"principals/{user}" for user in admin_users] ), policies_service_pb2.Policy.Binding( role="roles/report-viewer", members=[f"principals/{user}" for user in viewer_users] ) ] ) ) ) print(f"Created policy for measurementConsumers/{mc_id}") return policy# Usagepolicy = setup_measurement_consumer_policy( mc_id="123", admin_users=["alice", "bob"], viewer_users=["charlie", "david", "eve"])
def grant_user_access(resource_name, user_id, role_name): """ Grant a user access to a resource by adding them to a role. Args: resource_name: Protected resource name user_id: User principal ID role_name: Role to grant """ # Lookup existing policy policy = policies_client.LookupPolicy( policies_service_pb2.LookupPolicyRequest( protected_resource=resource_name ) ) # Add user to role updated_policy = policies_client.AddPolicyBindingMembers( policies_service_pb2.AddPolicyBindingMembersRequest( name=policy.name, role=f"roles/{role_name}", members=[f"principals/{user_id}"], etag=policy.etag ) ) print(f"Granted {role_name} to {user_id} on {resource_name}") return updated_policy# Usagegrant_user_access( resource_name="measurementConsumers/123", user_id="frank", role_name="report-viewer")
def revoke_user_access(resource_name, user_id, role_name): """ Revoke a user's access to a resource. Args: resource_name: Protected resource name user_id: User principal ID role_name: Role to revoke """ # Lookup policy policy = policies_client.LookupPolicy( policies_service_pb2.LookupPolicyRequest( protected_resource=resource_name ) ) # Remove user from role try: updated_policy = policies_client.RemovePolicyBindingMembers( policies_service_pb2.RemovePolicyBindingMembersRequest( name=policy.name, role=f"roles/{role_name}", members=[f"principals/{user_id}"], etag=policy.etag ) ) print(f"Revoked {role_name} from {user_id} on {resource_name}") return updated_policy except grpc.RpcError as e: if "POLICY_BINDING_MEMBERSHIP_NOT_FOUND" in e.details(): print(f"User {user_id} didn't have {role_name} role") raise
def list_role_members(resource_name, role_name): """ List all principals that have a specific role on a resource. Args: resource_name: Protected resource name role_name: Role to query Returns: List of principal names """ policy = policies_client.LookupPolicy( policies_service_pb2.LookupPolicyRequest( protected_resource=resource_name ) ) role_resource_name = f"roles/{role_name}" for binding in policy.bindings: if binding.role == role_resource_name: return binding.members return [] # Role not found in policy# Usageadmins = list_role_members( resource_name="measurementConsumers/123", role_name="measurement-admin")print(f"Admins: {admins}")
def safe_add_member_with_retry(policy_name, role, member, max_retries=3): """ Add member with automatic retry on concurrent modification. Args: policy_name: Policy resource name role: Role to add member to member: Principal to add max_retries: Maximum retry attempts """ for attempt in range(max_retries): try: # Get current policy policy = policies_client.GetPolicy( policies_service_pb2.GetPolicyRequest(name=policy_name) ) # Try to add member with current etag return policies_client.AddPolicyBindingMembers( policies_service_pb2.AddPolicyBindingMembersRequest( name=policy_name, role=role, members=[member], etag=policy.etag # Use current etag ) ) except grpc.RpcError as e: if "ETAG_MISMATCH" in e.details() and attempt < max_retries - 1: print(f"Concurrent modification detected, retrying ({attempt + 1}/{max_retries})") continue raise raise Exception(f"Failed to add member after {max_retries} attempts")
Policy inheritance is not automatic. Each resource must have its own policy explicitly created. However, applications can implement inheritance logic when checking permissions.