Overview
The Permission Mongo Python SDK provides a seamless way to interact with Permission Mongo from Python applications. It handles authentication, automatic permission filtering, and provides a MongoDB-like interface for all operations.
The Python SDK is currently in development. This documentation describes the planned API and features.
Installation
Install the SDK using pip:
pip install permission-mongo
For development installations:
git clone https://github.com/permission-mongo/python-sdk.git
cd python-sdk
pip install -e .
Quick Start
from permission_mongo import PermissionMongo
# Initialize the client
client = PermissionMongo(
url = "http://localhost:8080" ,
api_key = "your-api-key"
)
# Create a document with automatic permission checks
user = client.insert( "users" , {
"name" : "John Doe" ,
"email" : "[email protected] " ,
"role" : "engineer"
})
print ( f "Created user: { user[ '_id' ] } " )
# Query with automatic permission filtering
active_users = client.find( "users" , { "status" : "active" })
for user in active_users:
print (user[ "name" ])
Client Initialization
Basic Configuration
from permission_mongo import PermissionMongo
client = PermissionMongo(
url = "http://localhost:8080" ,
api_key = "your-api-key"
)
Advanced Configuration
client = PermissionMongo(
url = "http://localhost:8080" ,
api_key = "your-api-key" ,
timeout = 30 , # Request timeout in seconds
max_retries = 3 , # Number of retries on failure
verify_ssl = True , # SSL certificate verification
headers = {
"X-Custom-Header" : "value"
}
)
Configuration Parameters
Base URL of your Permission Mongo server
API key for authentication
Request timeout in seconds
Maximum number of retry attempts for failed requests
Enable SSL certificate verification
Additional HTTP headers to include with every request
Document Operations
Create Document
Create a single document in a collection:
# Create a new user
user = client.insert( "users" , {
"name" : "Jane Smith" ,
"email" : "[email protected] " ,
"department" : "engineering" ,
"status" : "active"
})
print ( f "Created user ID: { user[ '_id' ] } " )
The created_by, created_at, and updated_at fields are automatically added based on your authentication context.
Get Document
Retrieve a single document by ID:
# Get user by ID
user = client.get( "users" , "507f1f77bcf86cd799439011" )
if user:
print ( f "User: { user[ 'name' ] } " )
else :
print ( "User not found or no permission" )
Update Document
Update an existing document:
# Update user status
updated_user = client.update( "users" , "507f1f77bcf86cd799439011" , {
"status" : "inactive" ,
"last_active" : "2024-03-04T10:00:00Z"
})
print ( f "Updated user: { updated_user[ '_id' ] } " )
The update operation automatically checks field-level permissions. Fields marked as deny_write in your policy will be rejected.
Delete Document
Delete a document by ID:
# Delete a user
result = client.delete( "users" , "507f1f77bcf86cd799439011" )
if result[ "deleted" ]:
print ( f "Deleted user: { result[ 'id' ] } " )
Query Operations
Find Documents
Query documents with filters and pagination:
# Find active users
users = client.find( "users" , {
"status" : "active" ,
"department" : "engineering"
})
for user in users:
print ( f " { user[ 'name' ] } - { user[ 'email' ] } " )
# Paginated query
result = client.find(
"users" ,
{ "status" : "active" },
limit = 20 ,
sort = [ "name" ],
fields = [ "name" , "email" , "department" ]
)
print ( f "Found { len (result[ 'data' ]) } users" )
# Get next page using cursor
if result[ 'pagination' ][ 'has_more' ]:
next_result = client.find(
"users" ,
{ "status" : "active" },
limit = 20 ,
cursor = result[ 'pagination' ][ 'cursor' ]
)
Query Parameters
Name of the collection to query
MongoDB-style filter query
Maximum number of documents to return (max 10,000)
List of fields to sort by. Prefix with - for descending order.
Example: ["name", "-created_at"]
Pagination cursor from previous response
List of fields to include in the response. If not specified, all allowed fields are returned.
Count Documents
Count documents matching a filter:
# Count active users
count = client.count( "users" , { "status" : "active" })
print ( f "Active users: { count } " )
# Count all users in department
dept_count = client.count( "users" , {
"department" : "engineering" ,
"status" : { "$in" : [ "active" , "pending" ]}
})
Aggregate
Execute aggregation pipelines:
# Group users by department
result = client.aggregate( "users" , [
{ "$match" : { "status" : "active" }},
{ "$group" : {
"_id" : "$department" ,
"count" : { "$sum" : 1 },
"avg_salary" : { "$avg" : "$salary" }
}},
{ "$sort" : { "count" : - 1 }}
])
for dept in result:
print ( f " { dept[ '_id' ] } : { dept[ 'count' ] } users" )
Permission filters are automatically injected at the beginning of your aggregation pipeline to ensure data security.
Batch Operations
Batch Create
Create multiple documents in a single request:
# Create multiple users
users = [
{ "name" : "Alice" , "email" : "[email protected] " , "role" : "engineer" },
{ "name" : "Bob" , "email" : "[email protected] " , "role" : "manager" },
{ "name" : "Charlie" , "email" : "[email protected] " , "role" : "engineer" }
]
result = client.batch_create( "users" , users)
print ( f "Created: { len (result[ 'created' ]) } users" )
if result[ 'failed' ]:
print ( f "Failed: { len (result[ 'failed' ]) } users" )
for failure in result[ 'failed' ]:
print ( f " Index { failure[ 'index' ] } : { failure[ 'error' ][ 'message' ] } " )
Batch Update
Update multiple documents at once:
# Update by IDs
result = client.batch_update(
"users" ,
ids = [ "507f1f77bcf86cd799439011" , "507f1f77bcf86cd799439012" ],
update = { "status" : "inactive" }
)
print ( f "Matched: { result[ 'matched' ] } , Modified: { result[ 'modified' ] } " )
# Update by filter
result = client.batch_update(
"users" ,
filter = { "department" : "sales" , "status" : "pending" },
update = { "status" : "active" }
)
Batch Delete
Delete multiple documents:
# Delete by IDs
result = client.batch_delete(
"users" ,
ids = [ "507f1f77bcf86cd799439011" , "507f1f77bcf86cd799439012" ]
)
print ( f "Deleted { result[ 'deleted' ] } documents" )
# Delete by filter
result = client.batch_delete(
"users" ,
filter = { "status" : "archived" , "last_active" : { "$lt" : "2023-01-01" }}
)
Batch operations respect all permission checks. Documents you don’t have permission to modify will be silently skipped.
Error Handling
Exception Types
The SDK raises different exceptions for various error conditions:
from permission_mongo import (
PermissionMongoError,
AuthenticationError,
PermissionDeniedError,
NotFoundError,
ValidationError,
RateLimitError
)
try :
user = client.get( "users" , "invalid-id" )
except NotFoundError:
print ( "User not found" )
except PermissionDeniedError:
print ( "No permission to access this user" )
except AuthenticationError:
print ( "Authentication failed" )
except PermissionMongoError as e:
print ( f "Error: { e } " )
All exceptions include detailed error information:
try :
client.insert( "users" , { "name" : "" })
except ValidationError as e:
print ( f "Status: { e.status_code } " )
print ( f "Code: { e.error_code } " )
print ( f "Message: { e.message } " )
print ( f "Details: { e.details } " )
Advanced Features
Context Manager
Use the client as a context manager for automatic cleanup:
with PermissionMongo( url = "http://localhost:8080" , api_key = "key" ) as client:
users = client.find( "users" , { "status" : "active" })
# Client automatically closes when exiting the context
Async Support
Async/await support for high-performance applications:
from permission_mongo import AsyncPermissionMongo
import asyncio
async def main ():
async with AsyncPermissionMongo(
url = "http://localhost:8080" ,
api_key = "your-api-key"
) as client:
# All operations support async
user = await client.insert( "users" , {
"name" : "Async User" ,
"email" : "[email protected] "
})
users = await client.find( "users" , { "status" : "active" })
async for user in users:
print (user[ "name" ])
asyncio.run(main())
Set custom headers for specific requests:
# Add custom headers to a single request
user = client.get(
"users" ,
"507f1f77bcf86cd799439011" ,
headers = { "X-Request-ID" : "abc-123" }
)
# Update default headers
client.set_headers({
"X-Tenant-ID" : "tenant-123" ,
"X-User-ID" : "user-456"
})
Retry Configuration
Customize retry behavior for resilient applications:
from permission_mongo import RetryConfig
client = PermissionMongo(
url = "http://localhost:8080" ,
api_key = "your-api-key" ,
retry_config = RetryConfig(
max_retries = 5 ,
backoff_factor = 2 , # Exponential backoff: 2, 4, 8, 16 seconds
retry_on_status = [ 408 , 429 , 500 , 502 , 503 , 504 ]
)
)
Best Practices
Connection Pooling
Reuse client instances for better performance:
# Good: Reuse client
client = PermissionMongo( url = "http://localhost:8080" , api_key = "key" )
for i in range ( 100 ):
user = client.get( "users" , user_ids[i])
# Bad: Creating new client for each request
for i in range ( 100 ):
client = PermissionMongo( url = "http://localhost:8080" , api_key = "key" )
user = client.get( "users" , user_ids[i])
Use cursor-based pagination for large result sets:
def fetch_all_users ():
"""Fetch all users using cursor pagination"""
cursor = None
all_users = []
while True :
result = client.find(
"users" ,
{ "status" : "active" },
limit = 100 ,
cursor = cursor
)
all_users.extend(result[ 'data' ])
if not result[ 'pagination' ][ 'has_more' ]:
break
cursor = result[ 'pagination' ][ 'cursor' ]
return all_users
Error Handling
Always handle exceptions appropriately:
import logging
from permission_mongo import PermissionDeniedError, NotFoundError
logger = logging.getLogger( __name__ )
def safe_get_user ( user_id ):
try :
return client.get( "users" , user_id)
except NotFoundError:
logger.warning( f "User { user_id } not found" )
return None
except PermissionDeniedError:
logger.error( f "Permission denied for user { user_id } " )
raise
except Exception as e:
logger.exception( f "Unexpected error fetching user { user_id } " )
raise
Batch Operations
Use batch operations for bulk updates:
# Good: Batch update
client.batch_update(
"users" ,
ids = user_ids,
update = { "last_notification" : datetime.now()}
)
# Bad: Individual updates
for user_id in user_ids:
client.update(user_id, { "last_notification" : datetime.now()})
Examples
Multi-tenant Application
class UserService :
def __init__ ( self , api_key ):
self .client = PermissionMongo(
url = "http://localhost:8080" ,
api_key = api_key
)
def create_user ( self , data ):
"""Create a new user with automatic tenant isolation"""
return self .client.insert( "users" , data)
def get_team_members ( self , team_id ):
"""Get all members of a team"""
return self .client.find( "users" , {
"team_id" : team_id,
"status" : "active"
})
def update_user_role ( self , user_id , new_role ):
"""Update user role with permission checks"""
try :
return self .client.update( "users" , user_id, {
"role" : new_role
})
except PermissionDeniedError:
raise ValueError ( "You don't have permission to change user roles" )
Data Export
import csv
def export_users_to_csv ( filename ):
"""Export all accessible users to CSV"""
with open (filename, 'w' , newline = '' ) as csvfile:
writer = csv.DictWriter(
csvfile,
fieldnames = [ 'id' , 'name' , 'email' , 'department' ]
)
writer.writeheader()
cursor = None
while True :
result = client.find(
"users" ,
{ "status" : "active" },
limit = 1000 ,
cursor = cursor,
fields = [ "name" , "email" , "department" ]
)
for user in result[ 'data' ]:
writer.writerow({
'id' : user[ '_id' ],
'name' : user[ 'name' ],
'email' : user[ 'email' ],
'department' : user.get( 'department' , 'N/A' )
})
if not result[ 'pagination' ][ 'has_more' ]:
break
cursor = result[ 'pagination' ][ 'cursor' ]
Bulk Import
def import_users_from_csv ( filename ):
"""Import users from CSV with batch operations"""
batch = []
batch_size = 100
with open (filename, 'r' ) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
batch.append({
'name' : row[ 'name' ],
'email' : row[ 'email' ],
'department' : row[ 'department' ],
'status' : 'active'
})
if len (batch) >= batch_size:
result = client.batch_create( "users" , batch)
print ( f "Imported { len (result[ 'created' ]) } users" )
if result[ 'failed' ]:
print ( f "Failed: { len (result[ 'failed' ]) } users" )
batch = []
# Import remaining users
if batch:
result = client.batch_create( "users" , batch)
print ( f "Imported { len (result[ 'created' ]) } users" )
Migration Guide
From PyMongo
Migrating from PyMongo to Permission Mongo SDK:
# Before: PyMongo
from pymongo import MongoClient
mongo = MongoClient( "mongodb://localhost:27017" )
db = mongo[ 'mydb' ]
users = db.users.find({ "status" : "active" })
# After: Permission Mongo SDK
from permission_mongo import PermissionMongo
client = PermissionMongo(
url = "http://localhost:8080" ,
api_key = "your-api-key"
)
users = client.find( "users" , { "status" : "active" })
Key Differences
Automatic Permission Filtering : All queries automatically respect RBAC policies
Authentication Required : Every request requires authentication via API key
HTTP-based : Communication over HTTP/HTTPS instead of MongoDB wire protocol
Field-level Permissions : Automatic field masking and filtering based on user roles
Audit Logging : All operations are automatically logged for compliance
Troubleshooting
Common Issues
Problem : AuthenticationError: Invalid API keySolution :
Verify your API key is correct
Check that the API key hasn’t expired
Ensure you’re using the correct server URL
Verify the API key has the required permissions
Problem : Queries return empty results even though data existsSolution :
Check your RBAC policies - you may not have read permission
Verify you’re querying the correct tenant/organization
Check field-level permissions - fields may be filtered out
Review the when conditions in your policies
Problem : PermissionDeniedError on updates or deletesSolution :
Verify your user role has the required action permission
Check the when condition in your policy
For updates, ensure you’re not modifying deny_write fields
Review hierarchical permissions if using organizational structures
Problem : Requests timing outSolution :
Increase the timeout parameter in client initialization
Check network connectivity to the Permission Mongo server
Verify the server is running and healthy
Consider using connection pooling for high-traffic scenarios
Debug Mode
Enable debug logging to troubleshoot issues:
import logging
logging.basicConfig( level = logging. DEBUG )
client = PermissionMongo(
url = "http://localhost:8080" ,
api_key = "your-api-key"
)
# Now all requests and responses will be logged
API Reference Summary
Core Methods
Method Description Returns insert(collection, document)Create a single document dictget(collection, id)Get document by ID dict or Noneupdate(collection, id, updates)Update a document dictdelete(collection, id)Delete a document dictfind(collection, filter, **options)Query documents list[dict] or paginated resultcount(collection, filter)Count documents intaggregate(collection, pipeline)Execute aggregation list[dict]
Batch Methods
Method Description Returns batch_create(collection, documents)Create multiple documents dict with created and failedbatch_update(collection, ids=None, filter=None, update=None)Update multiple documents dict with countsbatch_delete(collection, ids=None, filter=None)Delete multiple documents dict with count
Support
GitHub Issues Report bugs or request features
Documentation Full API reference
Examples Sample code and tutorials
Community Join our Discord community
Contributing
We welcome contributions to the Python SDK! Check out our contributing guide to get started.
License
The Permission Mongo Python SDK is open source software licensed under the MIT License .