Skip to main content
The Drift simulation engine is a high-performance Python Monte Carlo simulator that runs 100,000+ scenarios in seconds using NumPy vectorization and parallel processing.

Technology Stack

TechnologyPurpose
NumPyVectorized array operations for performance
PandasData analysis and processing
PydanticRuntime type validation and parsing
OpenAIAI-powered goal parsing
MultiprocessingParallel execution across CPU cores

Project Structure

simulation/
├── __init__.py
├── main.py                # CLI entry point
├── monte_carlo.py         # Core Monte Carlo engine
├── models.py             # Pydantic data models
├── goal_parser.py        # AI goal parsing
├── data_processing.py    # Data utilities
├── sensitivity.py        # Sensitivity analysis
├── requirements.txt      # Python dependencies
└── tests/               # Unit tests

Core Engine: monte_carlo.py

Location: simulation/monte_carlo.py

Architecture

The engine uses a multi-worker parallel architecture:
┌─────────────────────────────────────────────────────┐
│              run_monte_carlo()                      │
│  - Split 100k simulations across N workers          │
│  - Generate seeds for reproducibility               │
└──────────────┬──────────────────────────────────────┘

       ┌───────┴────────┐
       │ multiprocessing │
       │   Pool(4)       │
       └───────┬─────────┘

     ┌─────────┴─────────┐
     │                   │
┌────▼────┐         ┌────▼────┐
│ Worker 1│         │ Worker 4│
│ 25k sims│   ...   │ 25k sims│
└────┬────┘         └────┬────┘
     │                   │
     └─────────┬─────────┘

          ┌────▼─────┐
          │ Aggregate │
          │ Results   │
          └───────────┘

Main Simulation Function

def run_monte_carlo(
    request: SimulationRequest,
    n_workers: int = None,
    progress_callback: Optional[Callable[[dict], None]] = None
) -> SimulationResults:
    """
    Run full Monte Carlo simulation with parallel workers.

    Args:
        request: Simulation request with financial profile and goal
        n_workers: Number of parallel workers (defaults to CPU count)
        progress_callback: Optional callback for progress updates

    Returns:
        SimulationResults with probability distributions and statistics
    """
    params = request.simulation_params or SimulationParams()
    n_simulations = params.n_simulations

    if n_workers is None:
        n_workers = min(cpu_count(), 4)

    # Generate seeds for reproducibility
    seeds = np.arange(n_simulations)

    # Split work across workers
    batches = np.array_split(seeds, n_workers)
    batch_args = [(request, batch, i) for i, batch in enumerate(batches)]

    # Run parallel simulations
    if n_workers > 1:
        results_list = []
        completed = 0
        with Pool(n_workers) as pool:
            for balances, batch_id in pool.imap_unordered(run_simulation_batch, batch_args):
                results_list.append(balances)
                completed += len(balances)
                if progress_callback:
                    progress_callback({
                        "type": "progress",
                        "completed": int(completed),
                        "total": int(n_simulations),
                        "percentage": round(completed / n_simulations * 100, 2)
                    })
        final_balances = np.concatenate(results_list)
    else:
        final_balances, _ = run_simulation_batch(batch_args[0])

    # Calculate statistics
    return calculate_results(final_balances, request)

Vectorized Simulation Batch

The core simulation uses NumPy vectorization for massive performance gains:
def run_simulation_batch(args: Tuple[SimulationRequest, np.ndarray, int]) -> Tuple[np.ndarray, int]:
    """
    Run a batch of Monte Carlo simulations using NumPy vectorization.
    
    This function processes thousands of simulations simultaneously using
    vectorized array operations instead of Python loops.
    """
    request, seeds, batch_id = args
    params = request.simulation_params or SimulationParams()
    n_sims = len(seeds)
    months = request.goal.timeline_months

    # Initialize random state
    rng = np.random.default_rng(seeds[0])

    # Starting balance
    starting_balance = (
        request.financial_profile.liquid_assets -
        request.financial_profile.credit_debt
    )

    # Pre-generate ALL random numbers for efficiency
    # Shape: (n_sims, months) - this is key to vectorization!
    income_noise = rng.normal(1.0, params.income_volatility, (n_sims, months))
    spending_noise = rng.normal(1.0, params.expense_volatility, (n_sims, months))
    emergency_events = rng.random((n_sims, months)) < params.emergency_probability
    emergency_amounts = rng.uniform(
        params.emergency_min,
        params.emergency_max,
        (n_sims, months)
    )

    # Monthly market returns
    monthly_return_mean = params.annual_return_mean / 12
    monthly_return_std = params.annual_return_std / np.sqrt(12)
    market_returns = rng.normal(monthly_return_mean, monthly_return_std, (n_sims, months))

    # Initialize balance array
    balances = np.zeros((n_sims, months + 1))
    balances[:, 0] = starting_balance

    # Track income/spending multipliers for raises/promotions
    income_multiplier = np.ones(n_sims)
    spending_multiplier = np.ones(n_sims)

    # Simulate month-by-month
    for month in range(months):
        # Apply inflation (compounds monthly)
        monthly_inflation = rng.normal(
            params.inflation_rate / 12,
            params.inflation_volatility / 12,
            n_sims
        )
        spending_multiplier *= (1 + monthly_inflation)

        # Apply annual raises (every 12 months)
        if month % 12 == 11:
            annual_raises = rng.normal(
                params.annual_raise_mean,
                params.annual_raise_volatility,
                n_sims
            )
            income_multiplier *= (1 + annual_raises)

        # Apply semi-annual promotion chances (every 6 months)
        if month % 6 == 5:
            promotion_events = rng.random(n_sims) < params.promotion_probability
            promotion_raises = rng.normal(
                params.promotion_raise_mean,
                params.promotion_raise_volatility,
                n_sims
            )
            income_multiplier[promotion_events] *= (1 + promotion_raises[promotion_events])

        # Calculate cash flows (vectorized across all simulations)
        base_income = request.user_inputs.monthly_income
        base_spending = request.financial_profile.monthly_spending

        income = base_income * income_multiplier * income_noise[:, month]
        spending = base_spending * spending_multiplier * spending_noise[:, month]
        emergencies = emergency_events[:, month] * emergency_amounts[:, month]

        # Investment returns (only on balance above emergency fund)
        emergency_fund_target = base_spending * 6
        investable_balance = np.maximum(balances[:, month] - emergency_fund_target, 0)
        returns = investable_balance * market_returns[:, month]

        # Update balances (vectorized!)
        balances[:, month + 1] = (
            balances[:, month] +
            income -
            spending -
            emergencies -
            request.financial_profile.monthly_loan_payments +
            returns
        )

    return balances[:, -1], batch_id  # Return final balances
The key to performance is pre-generating all random numbers as 2D arrays (n_sims, months) and using vectorized operations instead of nested loops.

Performance Benchmarks

100,000 simulations over 36 months:
WorkersTimeSims/Second
18.2s12,195
24.5s22,222
42.3s43,478
Speedup: 3.6x with 4 workers (near-linear scaling)

Account-Aware Simulation

The engine supports detailed per-account modeling:

Credit Card Interest Modeling

if params.use_account_aware_simulation and params.credit_cards:
    # Initialize per-card balances
    n_cards = len(params.credit_cards)
    card_balances = np.zeros((n_sims, n_cards))
    card_aprs = np.zeros(n_cards)
    
    for i, card in enumerate(params.credit_cards):
        card_balances[:, i] = card.balance
        card_aprs[i] = card.apr / 100.0  # Convert to decimal

    # During each month:
    for i in range(n_cards):
        # Monthly interest
        monthly_rate = card_aprs[i] / 12
        interest = card_balances[:, i] * monthly_rate
        
        # Add interest to balance
        card_balances[:, i] += interest
        
        # Minimum payment
        payment = np.minimum(card_min_payments[i], card_balances[:, i])
        card_balances[:, i] -= payment
        
        # Deduct from cash flow
        balances[:, month + 1] -= (interest + payment)

Loan Amortization

if params.loans:
    for i, loan in enumerate(params.loans):
        # Monthly interest
        monthly_rate = loan.interest_rate / 100.0 / 12
        interest = loan_balances[:, i] * monthly_rate
        
        # Fixed payment covers interest + principal
        payment = np.minimum(loan.monthly_payment, loan_balances[:, i] + interest)
        principal_payment = np.maximum(0, payment - interest)
        
        # Update loan balance
        loan_balances[:, i] = np.maximum(0, loan_balances[:, i] - principal_payment)
        
        # Deduct payment from cash flow
        balances[:, month + 1] -= payment

Pydantic Models

Location: simulation/models.py

Core Models

from pydantic import BaseModel, field_validator
from typing import Optional, Dict, List, Literal

class FinancialProfile(BaseModel):
    liquid_assets: float
    credit_debt: float = 0.0
    loan_debt: float = 0.0
    monthly_loan_payments: float = 0.0
    monthly_spending: float
    spending_by_category: Dict[str, float] = {}
    spending_volatility: float = 0.15

class UserInputs(BaseModel):
    monthly_income: float
    age: int
    risk_tolerance: Literal["low", "medium", "high"]

class Goal(BaseModel):
    target_amount: float
    timeline_months: int
    goal_type: str = "custom"
    goal_text: Optional[str] = None
    confidence: float = 1.0

    @field_validator('timeline_months')
    @classmethod
    def validate_timeline(cls, v):
        if v <= 0:
            raise ValueError(f'timeline_months must be positive, got {v}')
        return v

class SimulationParams(BaseModel):
    n_simulations: int = 100000
    income_volatility: float = 0.05
    expense_volatility: float = 0.15
    emergency_probability: float = 0.08
    emergency_min: float = 500
    emergency_max: float = 3000
    annual_return_mean: float = 0.07
    annual_return_std: float = 0.15
    inflation_rate: float = 0.025
    inflation_volatility: float = 0.01
    annual_raise_mean: float = 0.03
    annual_raise_volatility: float = 0.015
    promotion_probability: float = 0.15
    promotion_raise_mean: float = 0.08
    promotion_raise_volatility: float = 0.03
    
    # Account-aware simulation
    credit_cards: List[CreditCardParams] = []
    loans: List[LoanParams] = []
    use_account_aware_simulation: bool = False

    @staticmethod
    def from_risk_tolerance(
        risk_tolerance: Literal["low", "medium", "high"],
        base_params: Optional['SimulationParams'] = None
    ) -> 'SimulationParams':
        """Adjust return expectations based on risk tolerance."""
        params = base_params or SimulationParams()
        
        risk_profiles = {
            "low": {"annual_return_mean": 0.04, "annual_return_std": 0.08},
            "medium": {"annual_return_mean": 0.07, "annual_return_std": 0.15},
            "high": {"annual_return_mean": 0.10, "annual_return_std": 0.20},
        }
        
        profile = risk_profiles[risk_tolerance]
        params.annual_return_mean = profile["annual_return_mean"]
        params.annual_return_std = profile["annual_return_std"]
        
        return params

class SimulationRequest(BaseModel):
    financial_profile: FinancialProfile
    user_inputs: UserInputs
    goal: Goal
    simulation_params: Optional[SimulationParams] = None

class Percentiles(BaseModel):
    p10: float
    p25: float
    p50: float
    p75: float
    p90: float

class Assumptions(BaseModel):
    """Simulation assumptions for transparency."""
    annual_return_mean: float
    annual_return_std: float
    inflation_rate: float
    inflation_volatility: float
    annual_raise_mean: float
    annual_raise_frequency: str
    promotion_probability_semi_annual: float
    promotion_raise_mean: float
    emergency_probability_monthly: float
    emergency_amount_range: str
    income_volatility: float
    expense_volatility: float

class SimulationResults(BaseModel):
    success_probability: float
    median_outcome: float
    percentiles: Percentiles
    mean: float
    std: float
    worst_case: float
    best_case: float
    simulations_run: int
    workers_used: int
    assumptions: Optional[Assumptions] = None

CLI Entry Point

Location: simulation/main.py
#!/usr/bin/env python3
import argparse
import json
import sys
from models import SimulationRequest
from monte_carlo import run_monte_carlo
from sensitivity import run_sensitivity_analysis

def main():
    parser = argparse.ArgumentParser(description='Monte Carlo Financial Simulation')
    parser.add_argument('--mode', choices=['simulate', 'sensitivity', 'benchmark'],
                        default='simulate')
    parser.add_argument('--input', type=str, help='JSON input string')
    parser.add_argument('--workers', type=int, default=4)
    parser.add_argument('--output-format', choices=['json', 'pretty'], default='json')

    args = parser.parse_args()

    # Read input
    if args.input:
        input_json = args.input
    else:
        input_json = sys.stdin.read()

    # Parse request
    request = parse_request_from_json(input_json)

    # Execute
    if args.mode == 'simulate':
        results = run_monte_carlo(request, n_workers=args.workers)
        output = results_to_camel_case(results)
    elif args.mode == 'sensitivity':
        results = run_sensitivity_analysis(request)
        output = results_to_camel_case(results)

    # Output
    print(json.dumps(output, indent=2 if args.output_format == 'pretty' else None))

if __name__ == '__main__':
    main()

Usage Examples

From Command Line:
python main.py --mode simulate --input '{"financialProfile": {...}, "goal": {...}}'
From Node.js API:
const { spawn } = require('child_process')

const process = spawn('python3', [
  'main.py',
  '--mode', 'simulate',
  '--input', JSON.stringify(request)
])

process.stdout.on('data', (data) => {
  const results = JSON.parse(data.toString())
  console.log(results)
})

Sensitivity Analysis

Location: simulation/sensitivity.py
def run_sensitivity_analysis(request: SimulationRequest) -> SensitivityAnalysis:
    """
    Test impact of parameter changes on success probability.
    """
    # Run base simulation
    base_results = run_monte_carlo(request)
    base_prob = base_results.success_probability

    sensitivities = {}
    
    # Test income +10%
    modified_request = copy.deepcopy(request)
    modified_request.user_inputs.monthly_income *= 1.1
    results = run_monte_carlo(modified_request)
    sensitivities['income_plus_10'] = {
        'delta': results.success_probability - base_prob,
        'new_probability': results.success_probability,
        'impact': abs(results.success_probability - base_prob)
    }

    # Test spending -10%
    modified_request = copy.deepcopy(request)
    modified_request.financial_profile.monthly_spending *= 0.9
    results = run_monte_carlo(modified_request)
    sensitivities['spending_minus_10'] = {
        'delta': results.success_probability - base_prob,
        'new_probability': results.success_probability,
        'impact': abs(results.success_probability - base_prob)
    }

    # ... more scenarios

    # Find most impactful
    most_impactful = max(sensitivities.items(), key=lambda x: x[1]['impact'])[0]

    # Generate recommendations
    recommendations = generate_recommendations(sensitivities, base_prob)

    return SensitivityAnalysis(
        base_probability=base_prob,
        sensitivities=sensitivities,
        most_impactful=most_impactful,
        recommendations=recommendations
    )

AI Goal Parsing

Location: simulation/goal_parser.py Uses OpenAI to parse natural language goals:
import openai
from models import Goal

def parse_goal_from_text(goal_text: str) -> Goal:
    """
    Parse natural language goal into structured Goal object.
    
    Example:
        "Save $50,000 for a down payment in 3 years"
        -> Goal(target_amount=50000, timeline_months=36, goal_type="home_purchase")
    """
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[{
            "role": "system",
            "content": "Extract financial goal details. Return JSON with targetAmount, timelineMonths, goalType."
        }, {
            "role": "user",
            "content": goal_text
        }]
    )
    
    parsed = json.loads(response.choices[0].message.content)
    
    return Goal(
        target_amount=parsed['targetAmount'],
        timeline_months=parsed['timelineMonths'],
        goal_type=parsed.get('goalType', 'custom'),
        goal_text=goal_text,
        confidence=parsed.get('confidence', 0.9)
    )

Key Performance Techniques

1. NumPy Vectorization

Bad (Python loops):
for sim in range(n_sims):
    for month in range(months):
        income = base_income * (1 + random.gauss(0, 0.05))
        # ... process one month at a time
Good (Vectorized):
# Generate all random numbers at once
income_noise = rng.normal(1.0, 0.05, (n_sims, months))
# Apply to all simulations simultaneously
income = base_income * income_noise  # Operates on entire array
Speedup: ~100x faster

2. Multiprocessing

with Pool(n_workers) as pool:
    results = pool.imap_unordered(run_simulation_batch, batch_args)
    final_balances = np.concatenate(list(results))
Speedup: Linear with CPU cores (3.6x on 4 cores)

3. Pre-allocation

# Pre-allocate arrays
balances = np.zeros((n_sims, months + 1))
income_multiplier = np.ones(n_sims)

# Faster than growing arrays dynamically

Installation

cd simulation
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt

Dependencies

File: requirements.txt
numpy>=1.24.0
pandas>=2.0.0
pydantic>=2.0.0
openai>=1.0.0
python-dotenv>=1.0.0

Testing

pytest tests/
Example test:
import pytest
from monte_carlo import run_monte_carlo
from models import SimulationRequest, FinancialProfile, UserInputs, Goal

def test_simulation_convergence():
    """Test that 100k simulations produce stable results."""
    request = SimulationRequest(
        financial_profile=FinancialProfile(
            liquid_assets=50000,
            credit_debt=0,
            monthly_spending=3000
        ),
        user_inputs=UserInputs(
            monthly_income=5000,
            age=30,
            risk_tolerance="medium"
        ),
        goal=Goal(
            target_amount=100000,
            timeline_months=60
        )
    )
    
    results = run_monte_carlo(request, n_workers=4)
    
    assert 0 <= results.success_probability <= 1
    assert results.percentiles.p50 < results.percentiles.p75
    assert results.worst_case < results.best_case

Next Steps

Backend API

See how the Express API calls the Python engine

Architecture Overview

Return to the high-level architecture

Build docs developers (and LLMs) love