Skip to main content

Overview

Pro-Cloud is a private hosted layer that wraps the Timepoint Pro open-source engine with production concerns. It adds persistent storage, job queuing, authentication, and usage tracking while keeping the core simulation engine unchanged.

Architecture

Pro-Cloud includes the open-source Pro engine as a git submodule and adds:
  • Persistent storage - Postgres replaces SQLite; results survive deploys
  • Job queue - Celery + Redis for cancellable, deploy-surviving jobs
  • Auth - JWT + API key with Postgres persistence
  • Budget enforcement - Pre-submission budget checks, per-run cost tracking
  • Usage tracking - UsageRecord table records every run with cost and token counts
  • Railway deployment - Internal networking to Billing, Auth, and Web services
The cloud layer adds no runtime dependencies back into the core engine. The engine remains standalone and forkable.

Core Engine Isolation

The open-source Pro engine has zero cloud dependencies:
timepoint-pro/              (public repo)
├── simulation engine       (standalone, SQLite)
├── LLM calls → OpenRouter  (direct, no proxy)
└── data → local files      (no cloud storage)

timepoint-pro-cloud/        (private repo)
├── git submodule: timepoint-pro
├── wrapper layer:
│   ├── Postgres adapter
│   ├── Celery worker
│   ├── JWT auth
│   ├── budget enforcement
│   └── usage tracking
└── Railway config

Pro-Cloud Boundary

The Pro-Cloud wrapper adds these endpoints that are not in the open-source engine:
  • /api/usage - Usage statistics and quotas
  • /api/budget - Budget enforcement and cost tracking
These endpoints live in the Pro-Cloud wrapper and:
  1. Track usage locally in the UsageRecord table
  2. Optionally forward to the shared Billing service when BILLING_SERVICE_URL is configured
The open-source engine exposes only simulation data endpoints (e.g., /api/data-export/{run_id}).

Database Schema

Postgres Tables

Pro-Cloud uses Postgres with these tables:
-- Core run tracking (from open-source engine)
CREATE TABLE runs (
    run_id VARCHAR PRIMARY KEY,
    template_id VARCHAR,
    status VARCHAR,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    duration_seconds FLOAT,
    cost_usd DECIMAL(10,4),
    tokens_used INTEGER,
    llm_calls INTEGER,
    entities_created INTEGER,
    timepoints_created INTEGER,
    error_message TEXT
);

-- Entity states (tensor compression)
CREATE TABLE entity_states (
    entity_id VARCHAR PRIMARY KEY,
    run_id VARCHAR REFERENCES runs(run_id),
    tensor_state JSONB,
    created_at TIMESTAMP
);

-- Dialog history
CREATE TABLE dialog_turns (
    turn_id VARCHAR PRIMARY KEY,
    run_id VARCHAR REFERENCES runs(run_id),
    timepoint_id VARCHAR,
    speaker_id VARCHAR,
    content TEXT,
    generation_params JSONB,
    created_at TIMESTAMP
);

-- Usage tracking (Pro-Cloud specific)
CREATE TABLE usage_records (
    record_id SERIAL PRIMARY KEY,
    user_id VARCHAR,
    run_id VARCHAR REFERENCES runs(run_id),
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    cost_usd DECIMAL(10,4),
    tokens_used INTEGER,
    status VARCHAR
);

-- API keys (Pro-Cloud auth)
CREATE TABLE api_keys (
    key_id VARCHAR PRIMARY KEY,
    user_id VARCHAR,
    key_hash VARCHAR,
    created_at TIMESTAMP,
    expires_at TIMESTAMP,
    last_used_at TIMESTAMP
);

Environment Variables

Core Engine (from open-source)

# Required: OpenRouter API Key
OPENROUTER_API_KEY=sk-or-v1-...

# Optional: Groq for ultra-fast inference
GROQ_API_KEY=gsk_...

# LLM Service
LLM_SERVICE_ENABLED=true
LLM_MODEL=meta-llama/llama-3.1-70b-instruct

Pro-Cloud Additions

# Database
DATABASE_URL=postgresql://user:pass@postgres:5432/timepoint_pro

# Redis (Celery broker)
REDIS_URL=redis://redis:6379/0
CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/1

# JWT Auth
JWT_SECRET_KEY=your-secret-key-here
JWT_ALGORITHM=HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=60

# API Keys
API_KEY_SALT=your-salt-here

# Budget Enforcement
DEFAULT_USER_BUDGET_USD=10.00
BUDGET_CHECK_ENABLED=true

# Optional: Forward to shared Billing service
BILLING_SERVICE_URL=https://billing.timepointai.com
BILLING_SERVICE_TOKEN=your-billing-token

# Railway Internal Networking
RAILWAY_ENVIRONMENT=production
RAILWAY_SERVICE_NAME=timepoint-pro-cloud

Celery Job Queue

Worker Configuration

# celery_config.py
from celery import Celery

app = Celery(
    'timepoint_pro_cloud',
    broker=os.getenv('CELERY_BROKER_URL'),
    backend=os.getenv('CELERY_RESULT_BACKEND')
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,
    worker_prefetch_multiplier=1,
)

Task Definition

# tasks.py
from celery import Task
from timepoint_pro.simulation import run_simulation

@app.task(bind=True, name='run_simulation_task')
def run_simulation_task(self, run_id: str, config: dict):
    """Run simulation as background job."""
    try:
        # Update run status
        update_run_status(run_id, 'running')
        
        # Run core engine simulation
        result = run_simulation(config)
        
        # Track usage
        record_usage(run_id, result.cost_usd, result.tokens_used)
        
        # Update run status
        update_run_status(run_id, 'completed', result)
        
        return {'status': 'completed', 'run_id': run_id}
    except Exception as e:
        update_run_status(run_id, 'failed', error=str(e))
        raise

Job Submission

# Submit simulation job
from tasks import run_simulation_task

task = run_simulation_task.apply_async(
    args=[run_id, config],
    task_id=run_id,
    countdown=0
)

# Check status
result = task.AsyncResult(run_id)
print(result.state)  # PENDING, STARTED, SUCCESS, FAILURE

# Cancel job
result.revoke(terminate=True)

JWT Authentication

Token Generation

from datetime import datetime, timedelta
import jwt

def create_access_token(user_id: str) -> str:
    expire = datetime.utcnow() + timedelta(
        minutes=int(os.getenv('JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 60))
    )
    
    payload = {
        'sub': user_id,
        'exp': expire,
        'iat': datetime.utcnow()
    }
    
    return jwt.encode(
        payload,
        os.getenv('JWT_SECRET_KEY'),
        algorithm=os.getenv('JWT_ALGORITHM', 'HS256')
    )

API Key Validation

from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader
import hashlib

api_key_header = APIKeyHeader(name='X-API-Key')

async def validate_api_key(api_key: str = Security(api_key_header)):
    """Validate API key from database."""
    key_hash = hashlib.sha256(
        (api_key + os.getenv('API_KEY_SALT')).encode()
    ).hexdigest()
    
    # Check database
    stored_key = db.query(APIKey).filter(
        APIKey.key_hash == key_hash,
        APIKey.expires_at > datetime.utcnow()
    ).first()
    
    if not stored_key:
        raise HTTPException(status_code=403, detail='Invalid API key')
    
    # Update last used
    stored_key.last_used_at = datetime.utcnow()
    db.commit()
    
    return stored_key.user_id

Budget Enforcement

Pre-Submission Check

async def check_budget(user_id: str, estimated_cost: float):
    """Check if user has budget for run."""
    # Get user's current usage
    usage = db.query(func.sum(UsageRecord.cost_usd)).filter(
        UsageRecord.user_id == user_id,
        UsageRecord.created_at >= first_day_of_month()
    ).scalar() or 0.0
    
    # Get user's budget
    budget = get_user_budget(user_id)
    
    remaining = budget - usage
    
    if estimated_cost > remaining:
        raise HTTPException(
            status_code=402,
            detail=f'Insufficient budget. Remaining: ${remaining:.2f}, Required: ${estimated_cost:.2f}'
        )

Post-Run Tracking

def record_usage(run_id: str, cost_usd: float, tokens_used: int):
    """Record usage after run completes."""
    run = db.query(Run).filter(Run.run_id == run_id).first()
    
    usage = UsageRecord(
        user_id=run.user_id,
        run_id=run_id,
        started_at=run.started_at,
        completed_at=run.completed_at,
        cost_usd=cost_usd,
        tokens_used=tokens_used,
        status=run.status
    )
    
    db.add(usage)
    db.commit()
    
    # Optionally forward to Billing service
    if os.getenv('BILLING_SERVICE_URL'):
        forward_to_billing(usage)

API Endpoints

Pro-Cloud Specific Endpoints

from fastapi import APIRouter, Depends

router = APIRouter(prefix='/api')

@router.get('/usage')
async def get_usage(user_id: str = Depends(validate_api_key)):
    """Get user's usage statistics."""
    usage = db.query(
        func.sum(UsageRecord.cost_usd).label('total_cost'),
        func.sum(UsageRecord.tokens_used).label('total_tokens'),
        func.count(UsageRecord.record_id).label('total_runs')
    ).filter(
        UsageRecord.user_id == user_id,
        UsageRecord.created_at >= first_day_of_month()
    ).first()
    
    budget = get_user_budget(user_id)
    
    return {
        'user_id': user_id,
        'period': 'current_month',
        'budget_usd': budget,
        'used_usd': float(usage.total_cost or 0),
        'remaining_usd': budget - float(usage.total_cost or 0),
        'total_tokens': int(usage.total_tokens or 0),
        'total_runs': int(usage.total_runs or 0)
    }

@router.get('/budget')
async def get_budget(user_id: str = Depends(validate_api_key)):
    """Get user's budget information."""
    budget = get_user_budget(user_id)
    usage = get_current_month_usage(user_id)
    
    return {
        'user_id': user_id,
        'budget_usd': budget,
        'used_usd': usage,
        'remaining_usd': budget - usage,
        'reset_date': first_day_of_next_month().isoformat()
    }

Deployment

See the Railway Deployment guide for detailed deployment instructions using Railway’s internal networking.

Monitoring

Celery Monitoring

# Monitor worker status
celery -A tasks inspect active
celery -A tasks inspect stats

# Monitor queue length
celery -A tasks inspect reserved

# Purge failed tasks
celery -A tasks purge

Database Monitoring

-- Active runs
SELECT status, COUNT(*) 
FROM runs 
GROUP BY status;

-- Usage by user (current month)
SELECT user_id, 
       SUM(cost_usd) as total_cost,
       COUNT(*) as total_runs
FROM usage_records
WHERE started_at >= date_trunc('month', CURRENT_DATE)
GROUP BY user_id
ORDER BY total_cost DESC;

-- Budget status
SELECT u.user_id,
       u.budget_usd,
       COALESCE(SUM(ur.cost_usd), 0) as used_usd,
       u.budget_usd - COALESCE(SUM(ur.cost_usd), 0) as remaining_usd
FROM users u
LEFT JOIN usage_records ur ON u.user_id = ur.user_id
  AND ur.started_at >= date_trunc('month', CURRENT_DATE)
GROUP BY u.user_id, u.budget_usd;

Integration with Billing Service

Optional Forwarding

import httpx

def forward_to_billing(usage: UsageRecord):
    """Forward usage record to shared Billing service."""
    url = os.getenv('BILLING_SERVICE_URL')
    token = os.getenv('BILLING_SERVICE_TOKEN')
    
    if not url:
        return  # Billing forwarding not configured
    
    try:
        response = httpx.post(
            f'{url}/api/usage',
            json={
                'user_id': usage.user_id,
                'service': 'timepoint-pro-cloud',
                'run_id': usage.run_id,
                'cost_usd': float(usage.cost_usd),
                'tokens_used': usage.tokens_used,
                'started_at': usage.started_at.isoformat(),
                'completed_at': usage.completed_at.isoformat()
            },
            headers={'Authorization': f'Bearer {token}'},
            timeout=5.0
        )
        response.raise_for_status()
    except Exception as e:
        logger.warning(f'Failed to forward usage to Billing: {e}')

Next Steps

Build docs developers (and LLMs) love