Skip to main content

Overview

The Permission Mongo Python SDK provides a seamless way to interact with Permission Mongo from Python applications. It handles authentication, automatic permission filtering, and provides a MongoDB-like interface for all operations.
The Python SDK is currently in development. This documentation describes the planned API and features.

Installation

Install the SDK using pip:
pip install permission-mongo
For development installations:
git clone https://github.com/permission-mongo/python-sdk.git
cd python-sdk
pip install -e .

Quick Start

from permission_mongo import PermissionMongo

# Initialize the client
client = PermissionMongo(
    url="http://localhost:8080",
    api_key="your-api-key"
)

# Create a document with automatic permission checks
user = client.insert("users", {
    "name": "John Doe",
    "email": "[email protected]",
    "role": "engineer"
})

print(f"Created user: {user['_id']}")

# Query with automatic permission filtering
active_users = client.find("users", {"status": "active"})
for user in active_users:
    print(user["name"])

Client Initialization

Basic Configuration

from permission_mongo import PermissionMongo

client = PermissionMongo(
    url="http://localhost:8080",
    api_key="your-api-key"
)

Advanced Configuration

client = PermissionMongo(
    url="http://localhost:8080",
    api_key="your-api-key",
    timeout=30,  # Request timeout in seconds
    max_retries=3,  # Number of retries on failure
    verify_ssl=True,  # SSL certificate verification
    headers={
        "X-Custom-Header": "value"
    }
)

Configuration Parameters

url
string
required
Base URL of your Permission Mongo server
api_key
string
required
API key for authentication
timeout
int
default:"30"
Request timeout in seconds
max_retries
int
default:"3"
Maximum number of retry attempts for failed requests
verify_ssl
bool
default:"true"
Enable SSL certificate verification
headers
dict
Additional HTTP headers to include with every request

Document Operations

Create Document

Create a single document in a collection:
# Create a new user
user = client.insert("users", {
    "name": "Jane Smith",
    "email": "[email protected]",
    "department": "engineering",
    "status": "active"
})

print(f"Created user ID: {user['_id']}")
The created_by, created_at, and updated_at fields are automatically added based on your authentication context.

Get Document

Retrieve a single document by ID:
# Get user by ID
user = client.get("users", "507f1f77bcf86cd799439011")

if user:
    print(f"User: {user['name']}")
else:
    print("User not found or no permission")

Update Document

Update an existing document:
# Update user status
updated_user = client.update("users", "507f1f77bcf86cd799439011", {
    "status": "inactive",
    "last_active": "2024-03-04T10:00:00Z"
})

print(f"Updated user: {updated_user['_id']}")
The update operation automatically checks field-level permissions. Fields marked as deny_write in your policy will be rejected.

Delete Document

Delete a document by ID:
# Delete a user
result = client.delete("users", "507f1f77bcf86cd799439011")

if result["deleted"]:
    print(f"Deleted user: {result['id']}")

Query Operations

Find Documents

Query documents with filters and pagination:
# Find active users
users = client.find("users", {
    "status": "active",
    "department": "engineering"
})

for user in users:
    print(f"{user['name']} - {user['email']}")

Find with Pagination

# Paginated query
result = client.find(
    "users",
    {"status": "active"},
    limit=20,
    sort=["name"],
    fields=["name", "email", "department"]
)

print(f"Found {len(result['data'])} users")

# Get next page using cursor
if result['pagination']['has_more']:
    next_result = client.find(
        "users",
        {"status": "active"},
        limit=20,
        cursor=result['pagination']['cursor']
    )

Query Parameters

collection
string
required
Name of the collection to query
filter
dict
required
MongoDB-style filter query
limit
int
default:"20"
Maximum number of documents to return (max 10,000)
sort
list
List of fields to sort by. Prefix with - for descending order. Example: ["name", "-created_at"]
cursor
string
Pagination cursor from previous response
fields
list
List of fields to include in the response. If not specified, all allowed fields are returned.

Count Documents

Count documents matching a filter:
# Count active users
count = client.count("users", {"status": "active"})
print(f"Active users: {count}")

# Count all users in department
dept_count = client.count("users", {
    "department": "engineering",
    "status": {"$in": ["active", "pending"]}
})

Aggregate

Execute aggregation pipelines:
# Group users by department
result = client.aggregate("users", [
    {"$match": {"status": "active"}},
    {"$group": {
        "_id": "$department",
        "count": {"$sum": 1},
        "avg_salary": {"$avg": "$salary"}
    }},
    {"$sort": {"count": -1}}
])

for dept in result:
    print(f"{dept['_id']}: {dept['count']} users")
Permission filters are automatically injected at the beginning of your aggregation pipeline to ensure data security.

Batch Operations

Batch Create

Create multiple documents in a single request:
# Create multiple users
users = [
    {"name": "Alice", "email": "[email protected]", "role": "engineer"},
    {"name": "Bob", "email": "[email protected]", "role": "manager"},
    {"name": "Charlie", "email": "[email protected]", "role": "engineer"}
]

result = client.batch_create("users", users)

print(f"Created: {len(result['created'])} users")
if result['failed']:
    print(f"Failed: {len(result['failed'])} users")
    for failure in result['failed']:
        print(f"  Index {failure['index']}: {failure['error']['message']}")

Batch Update

Update multiple documents at once:
# Update by IDs
result = client.batch_update(
    "users",
    ids=["507f1f77bcf86cd799439011", "507f1f77bcf86cd799439012"],
    update={"status": "inactive"}
)

print(f"Matched: {result['matched']}, Modified: {result['modified']}")

# Update by filter
result = client.batch_update(
    "users",
    filter={"department": "sales", "status": "pending"},
    update={"status": "active"}
)

Batch Delete

Delete multiple documents:
# Delete by IDs
result = client.batch_delete(
    "users",
    ids=["507f1f77bcf86cd799439011", "507f1f77bcf86cd799439012"]
)

print(f"Deleted {result['deleted']} documents")

# Delete by filter
result = client.batch_delete(
    "users",
    filter={"status": "archived", "last_active": {"$lt": "2023-01-01"}}
)
Batch operations respect all permission checks. Documents you don’t have permission to modify will be silently skipped.

Error Handling

Exception Types

The SDK raises different exceptions for various error conditions:
from permission_mongo import (
    PermissionMongoError,
    AuthenticationError,
    PermissionDeniedError,
    NotFoundError,
    ValidationError,
    RateLimitError
)

try:
    user = client.get("users", "invalid-id")
except NotFoundError:
    print("User not found")
except PermissionDeniedError:
    print("No permission to access this user")
except AuthenticationError:
    print("Authentication failed")
except PermissionMongoError as e:
    print(f"Error: {e}")

Error Response Format

All exceptions include detailed error information:
try:
    client.insert("users", {"name": ""})
except ValidationError as e:
    print(f"Status: {e.status_code}")
    print(f"Code: {e.error_code}")
    print(f"Message: {e.message}")
    print(f"Details: {e.details}")

Advanced Features

Context Manager

Use the client as a context manager for automatic cleanup:
with PermissionMongo(url="http://localhost:8080", api_key="key") as client:
    users = client.find("users", {"status": "active"})
    # Client automatically closes when exiting the context

Async Support

Async/await support for high-performance applications:
from permission_mongo import AsyncPermissionMongo
import asyncio

async def main():
    async with AsyncPermissionMongo(
        url="http://localhost:8080",
        api_key="your-api-key"
    ) as client:
        # All operations support async
        user = await client.insert("users", {
            "name": "Async User",
            "email": "[email protected]"
        })
        
        users = await client.find("users", {"status": "active"})
        async for user in users:
            print(user["name"])

asyncio.run(main())

Custom Headers

Set custom headers for specific requests:
# Add custom headers to a single request
user = client.get(
    "users",
    "507f1f77bcf86cd799439011",
    headers={"X-Request-ID": "abc-123"}
)

# Update default headers
client.set_headers({
    "X-Tenant-ID": "tenant-123",
    "X-User-ID": "user-456"
})

Retry Configuration

Customize retry behavior for resilient applications:
from permission_mongo import RetryConfig

client = PermissionMongo(
    url="http://localhost:8080",
    api_key="your-api-key",
    retry_config=RetryConfig(
        max_retries=5,
        backoff_factor=2,  # Exponential backoff: 2, 4, 8, 16 seconds
        retry_on_status=[408, 429, 500, 502, 503, 504]
    )
)

Best Practices

Connection Pooling

Reuse client instances for better performance:
# Good: Reuse client
client = PermissionMongo(url="http://localhost:8080", api_key="key")

for i in range(100):
    user = client.get("users", user_ids[i])

# Bad: Creating new client for each request
for i in range(100):
    client = PermissionMongo(url="http://localhost:8080", api_key="key")
    user = client.get("users", user_ids[i])

Pagination

Use cursor-based pagination for large result sets:
def fetch_all_users():
    """Fetch all users using cursor pagination"""
    cursor = None
    all_users = []
    
    while True:
        result = client.find(
            "users",
            {"status": "active"},
            limit=100,
            cursor=cursor
        )
        
        all_users.extend(result['data'])
        
        if not result['pagination']['has_more']:
            break
            
        cursor = result['pagination']['cursor']
    
    return all_users

Error Handling

Always handle exceptions appropriately:
import logging
from permission_mongo import PermissionDeniedError, NotFoundError

logger = logging.getLogger(__name__)

def safe_get_user(user_id):
    try:
        return client.get("users", user_id)
    except NotFoundError:
        logger.warning(f"User {user_id} not found")
        return None
    except PermissionDeniedError:
        logger.error(f"Permission denied for user {user_id}")
        raise
    except Exception as e:
        logger.exception(f"Unexpected error fetching user {user_id}")
        raise

Batch Operations

Use batch operations for bulk updates:
# Good: Batch update
client.batch_update(
    "users",
    ids=user_ids,
    update={"last_notification": datetime.now()}
)

# Bad: Individual updates
for user_id in user_ids:
    client.update(user_id, {"last_notification": datetime.now()})

Examples

Multi-tenant Application

class UserService:
    def __init__(self, api_key):
        self.client = PermissionMongo(
            url="http://localhost:8080",
            api_key=api_key
        )
    
    def create_user(self, data):
        """Create a new user with automatic tenant isolation"""
        return self.client.insert("users", data)
    
    def get_team_members(self, team_id):
        """Get all members of a team"""
        return self.client.find("users", {
            "team_id": team_id,
            "status": "active"
        })
    
    def update_user_role(self, user_id, new_role):
        """Update user role with permission checks"""
        try:
            return self.client.update("users", user_id, {
                "role": new_role
            })
        except PermissionDeniedError:
            raise ValueError("You don't have permission to change user roles")

Data Export

import csv

def export_users_to_csv(filename):
    """Export all accessible users to CSV"""
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.DictWriter(
            csvfile,
            fieldnames=['id', 'name', 'email', 'department']
        )
        writer.writeheader()
        
        cursor = None
        while True:
            result = client.find(
                "users",
                {"status": "active"},
                limit=1000,
                cursor=cursor,
                fields=["name", "email", "department"]
            )
            
            for user in result['data']:
                writer.writerow({
                    'id': user['_id'],
                    'name': user['name'],
                    'email': user['email'],
                    'department': user.get('department', 'N/A')
                })
            
            if not result['pagination']['has_more']:
                break
            
            cursor = result['pagination']['cursor']

Bulk Import

def import_users_from_csv(filename):
    """Import users from CSV with batch operations"""
    batch = []
    batch_size = 100
    
    with open(filename, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        
        for row in reader:
            batch.append({
                'name': row['name'],
                'email': row['email'],
                'department': row['department'],
                'status': 'active'
            })
            
            if len(batch) >= batch_size:
                result = client.batch_create("users", batch)
                print(f"Imported {len(result['created'])} users")
                
                if result['failed']:
                    print(f"Failed: {len(result['failed'])} users")
                
                batch = []
        
        # Import remaining users
        if batch:
            result = client.batch_create("users", batch)
            print(f"Imported {len(result['created'])} users")

Migration Guide

From PyMongo

Migrating from PyMongo to Permission Mongo SDK:
# Before: PyMongo
from pymongo import MongoClient

mongo = MongoClient("mongodb://localhost:27017")
db = mongo['mydb']
users = db.users.find({"status": "active"})

# After: Permission Mongo SDK
from permission_mongo import PermissionMongo

client = PermissionMongo(
    url="http://localhost:8080",
    api_key="your-api-key"
)
users = client.find("users", {"status": "active"})

Key Differences

  1. Automatic Permission Filtering: All queries automatically respect RBAC policies
  2. Authentication Required: Every request requires authentication via API key
  3. HTTP-based: Communication over HTTP/HTTPS instead of MongoDB wire protocol
  4. Field-level Permissions: Automatic field masking and filtering based on user roles
  5. Audit Logging: All operations are automatically logged for compliance

Troubleshooting

Common Issues

Problem: AuthenticationError: Invalid API keySolution:
  • Verify your API key is correct
  • Check that the API key hasn’t expired
  • Ensure you’re using the correct server URL
  • Verify the API key has the required permissions
Problem: Queries return empty results even though data existsSolution:
  • Check your RBAC policies - you may not have read permission
  • Verify you’re querying the correct tenant/organization
  • Check field-level permissions - fields may be filtered out
  • Review the when conditions in your policies
Problem: PermissionDeniedError on updates or deletesSolution:
  • Verify your user role has the required action permission
  • Check the when condition in your policy
  • For updates, ensure you’re not modifying deny_write fields
  • Review hierarchical permissions if using organizational structures
Problem: Requests timing outSolution:
  • Increase the timeout parameter in client initialization
  • Check network connectivity to the Permission Mongo server
  • Verify the server is running and healthy
  • Consider using connection pooling for high-traffic scenarios

Debug Mode

Enable debug logging to troubleshoot issues:
import logging

logging.basicConfig(level=logging.DEBUG)

client = PermissionMongo(
    url="http://localhost:8080",
    api_key="your-api-key"
)

# Now all requests and responses will be logged

API Reference Summary

Core Methods

MethodDescriptionReturns
insert(collection, document)Create a single documentdict
get(collection, id)Get document by IDdict or None
update(collection, id, updates)Update a documentdict
delete(collection, id)Delete a documentdict
find(collection, filter, **options)Query documentslist[dict] or paginated result
count(collection, filter)Count documentsint
aggregate(collection, pipeline)Execute aggregationlist[dict]

Batch Methods

MethodDescriptionReturns
batch_create(collection, documents)Create multiple documentsdict with created and failed
batch_update(collection, ids=None, filter=None, update=None)Update multiple documentsdict with counts
batch_delete(collection, ids=None, filter=None)Delete multiple documentsdict with count

Support

GitHub Issues

Report bugs or request features

Documentation

Full API reference

Examples

Sample code and tutorials

Community

Join our Discord community

Contributing

We welcome contributions to the Python SDK! Check out our contributing guide to get started.

License

The Permission Mongo Python SDK is open source software licensed under the MIT License.

Build docs developers (and LLMs) love