Overview
Athena ERP uses Cloudflare R2 for object storage, providing:
- Cost-effective storage - No egress fees
- S3-compatible API - Works with existing S3 tools
- Global distribution - Fast access worldwide
- Scalable storage - Unlimited capacity
R2 stores:
- Student documents and files
- Academic reports (PDF)
- School media assets
- User profile images
- Communication attachments
Service Implementation
Athena implements R2 integration via the R2Service class:
# app/utils/r2.py
import boto3
from botocore.config import Config
class R2Service:
def __init__(self):
if not settings.r2_access_key_id or not settings.r2_secret_access_key:
self.s3_client = None
else:
self.s3_client = boto3.client(
"s3",
endpoint_url=settings.r2_endpoint_url,
aws_access_key_id=settings.r2_access_key_id,
aws_secret_access_key=settings.r2_secret_access_key,
config=Config(signature_version="s3v4"),
region_name="auto",
)
self.bucket = settings.r2_bucket_name
r2_service = R2Service()
Key Features
- S3 Compatibility: Uses
boto3 with custom endpoint
- Lazy Initialization: Only creates client if credentials provided
- Signature v4: Required for R2 authentication
- Auto Region: R2 handles region automatically
Configuration
Environment Variables
Add R2 credentials to your .env file:
# Cloudflare R2 Configuration
R2_ENDPOINT_URL=https://[account-id].r2.cloudflarestorage.com
R2_ACCESS_KEY_ID=your-r2-access-key
R2_SECRET_ACCESS_KEY=your-r2-secret-key
R2_BUCKET_NAME=athena-docs
Obtaining R2 Credentials
-
Create R2 Bucket:
- Navigate to Cloudflare Dashboard → R2
- Click “Create bucket”
- Name:
athena-docs (or your preferred name)
-
Generate API Token:
- R2 → Manage R2 API Tokens
- Create API Token with permissions:
- Object Read & Write
- Bucket Read (optional)
- Save the Access Key ID and Secret Access Key
-
Get Endpoint URL:
- Format:
https://[account-id].r2.cloudflarestorage.com
- Find your account ID in Cloudflare Dashboard
Store R2 credentials securely. Never commit them to version control.
Core Operations
Upload File
Upload files with automatic content type detection:
async def upload_file(
file: BinaryIO,
filename: str,
content_type: str = None
) -> str:
"""
Uploads a file to R2 and returns the file path.
"""
if not self.s3_client:
raise ValueError("R2 client not initialized")
extra_args = {}
if content_type:
extra_args["ContentType"] = content_type
self.s3_client.upload_fileobj(
file,
self.bucket,
filename,
ExtraArgs=extra_args
)
return filename
Usage Example:
from app.utils.r2 import r2_service
@router.post("/upload")
async def upload_document(file: UploadFile):
# Generate unique filename
filename = f"documents/{uuid.uuid4()}_{file.filename}"
# Upload to R2
file_path = await r2_service.upload_file(
file.file,
filename,
content_type=file.content_type
)
return {"file_path": file_path}
Generate Signed URL
Create temporary URLs for secure file access:
async def get_file_url(
filename: str,
expires_in_seconds: int = 600,
response_content_disposition: str | None = None,
) -> str:
"""
Returns a temporary signed URL to access a file in R2.
"""
if not self.s3_client:
raise ValueError("R2 client not initialized")
params = {"Bucket": self.bucket, "Key": filename}
if response_content_disposition:
params["ResponseContentDisposition"] = response_content_disposition
return self.s3_client.generate_presigned_url(
"get_object",
Params=params,
ExpiresIn=expires_in_seconds,
)
Usage Example:
@router.get("/documents/{document_id}/download")
async def download_document(document_id: str):
# Retrieve document metadata from database
document = await get_document(document_id)
# Generate temporary download URL (valid for 10 minutes)
url = await r2_service.get_file_url(
document.file_path,
expires_in_seconds=600,
response_content_disposition=f'attachment; filename="{document.filename}"'
)
return {"download_url": url}
Signed URLs expire after the specified time. Default is 10 minutes (600 seconds).
Download File
Retrieve file contents directly:
async def get_file(filename: str) -> dict:
"""
Downloads a file from R2 and returns its payload and metadata.
"""
if not self.s3_client:
raise ValueError("R2 client not initialized")
obj = self.s3_client.get_object(Bucket=self.bucket, Key=filename)
body = obj["Body"].read()
return {
"body": body,
"content_type": obj.get("ContentType") or "application/octet-stream",
"content_length": obj.get("ContentLength"),
}
Usage Example:
@router.get("/documents/{document_id}/preview")
async def preview_document(document_id: str):
document = await get_document(document_id)
# Download file from R2
file_data = await r2_service.get_file(document.file_path)
return Response(
content=file_data["body"],
media_type=file_data["content_type"]
)
Delete File
Remove files from storage:
async def delete_file(filename: str):
"""
Deletes a file from R2.
"""
self.s3_client.delete_object(Bucket=self.bucket, Key=filename)
Usage Example:
@router.delete("/documents/{document_id}")
async def delete_document(document_id: str, db: AsyncSession = Depends(get_db)):
document = await get_document(document_id)
# Delete from R2
await r2_service.delete_file(document.file_path)
# Delete from database
await db.delete(document)
await db.commit()
return {"message": "Document deleted"}
File Organization
Naming Conventions
Organize files by type and school:
# Document structure
documents/{school_id}/{year}/{uuid}_{filename}
reports/{school_id}/{type}/{year}/{uuid}.pdf
profiles/{user_id}/avatar_{uuid}.jpg
media/{school_id}/logo.png
attachments/{message_id}/{uuid}_{filename}
Example Implementation:
import uuid
from datetime import datetime
def generate_document_path(school_id: str, filename: str) -> str:
year = datetime.now().year
file_uuid = uuid.uuid4()
return f"documents/{school_id}/{year}/{file_uuid}_{filename}"
def generate_report_path(school_id: str, report_type: str) -> str:
year = datetime.now().year
file_uuid = uuid.uuid4()
return f"reports/{school_id}/{report_type}/{year}/{file_uuid}.pdf"
Store file metadata in PostgreSQL:
class Document(Base):
__tablename__ = "documents"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
school_id = Column(UUID, ForeignKey("schools.id"), nullable=False)
filename = Column(String(255), nullable=False)
file_path = Column(String(500), nullable=False) # R2 path
content_type = Column(String(100))
file_size = Column(BigInteger)
uploaded_by = Column(UUID, ForeignKey("users.id"))
uploaded_at = Column(DateTime, default=datetime.utcnow)
# Relationships
school = relationship("School")
uploader = relationship("User")
Security
Access Control
Implement permission checks before file operations:
from app.deps import require_permissions
@router.post("/documents/upload")
async def upload_document(
file: UploadFile,
auth: AuthContext = Depends(require_permissions("documents:write")),
db: AsyncSession = Depends(get_db)
):
# Verify school context
if not auth.school_id:
raise HTTPException(status_code=403, detail="School context required")
# Generate secure path
file_path = generate_document_path(str(auth.school_id), file.filename)
# Upload file
await r2_service.upload_file(file.file, file_path, file.content_type)
# Save metadata
document = Document(
school_id=auth.school_id,
filename=file.filename,
file_path=file_path,
content_type=file.content_type,
file_size=file.size,
uploaded_by=auth.user.id
)
db.add(document)
await db.commit()
return document
File Validation
Validate uploaded files:
from fastapi import HTTPException
ALLOWED_MIME_TYPES = [
"application/pdf",
"image/jpeg",
"image/png",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
]
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
def validate_file(file: UploadFile):
# Check file size
if file.size > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail="File too large")
# Check MIME type
if file.content_type not in ALLOWED_MIME_TYPES:
raise HTTPException(status_code=415, detail="Unsupported file type")
return True
Signed URL Best Practices
- Use short expiration times (5-15 minutes)
- Generate URLs on-demand, don’t store them
- Include
Content-Disposition for downloads
- Log URL generation for audit trails
Async Considerations
Boto3 is synchronous. In production, consider using aiobotocore for true async operations:
import aiobotocore.session
class AsyncR2Service:
def __init__(self):
self.session = aiobotocore.session.get_session()
self.config = {
"endpoint_url": settings.r2_endpoint_url,
"aws_access_key_id": settings.r2_access_key_id,
"aws_secret_access_key": settings.r2_secret_access_key,
"region_name": "auto",
}
async def upload_file(self, file: BinaryIO, filename: str):
async with self.session.create_client('s3', **self.config) as client:
await client.put_object(
Bucket=settings.r2_bucket_name,
Key=filename,
Body=file
)
Caching
Cache signed URLs for repeated access:
from functools import lru_cache
import time
url_cache = {}
async def get_cached_file_url(filename: str, ttl: int = 300) -> str:
cache_key = f"{filename}:{int(time.time() / ttl)}"
if cache_key in url_cache:
return url_cache[cache_key]
url = await r2_service.get_file_url(filename, expires_in_seconds=ttl)
url_cache[cache_key] = url
return url
Multipart Uploads
For large files (>100MB), use multipart uploads:
def upload_large_file(file_path: str, key: str):
s3_client.upload_file(
file_path,
settings.r2_bucket_name,
key,
Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=1024 * 25, # 25MB
max_concurrency=10,
multipart_chunksize=1024 * 25,
use_threads=True
)
)
Public Access (Optional)
Bucket Policies
To serve public assets (logos, etc.), configure bucket policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::athena-docs/public/*"
}
]
}
Custom Domains
Connect a custom domain to R2:
- Add domain in Cloudflare R2 settings
- Configure DNS CNAME record
- Access files via:
https://cdn.yourdomain.com/file.jpg
Monitoring and Costs
Usage Tracking
Monitor R2 usage via Cloudflare Dashboard:
- Total storage size
- Number of objects
- API request count
- Bandwidth usage
Cost Optimization
- Storage: $0.015/GB-month (first 10GB free)
- Class A operations: $4.50/million requests
- Class B operations: $0.36/million requests
- Egress: $0 (major advantage over S3)
Best Practices:
- Delete unused files regularly
- Implement lifecycle policies
- Use appropriate file naming for organization
- Monitor API request patterns
Troubleshooting
Connection Errors
Problem: EndpointConnectionError
Solution:
- Verify
R2_ENDPOINT_URL format
- Check account ID in endpoint URL
- Ensure network connectivity
Authentication Errors
Problem: SignatureDoesNotMatch
Solution:
- Verify
R2_ACCESS_KEY_ID and R2_SECRET_ACCESS_KEY
- Ensure using signature version
s3v4
- Check token permissions
File Not Found
Problem: NoSuchKey error
Solution:
- Verify file path matches database record
- Check bucket name configuration
- Ensure file was successfully uploaded