Technology Stack
| Technology | Purpose |
|---|---|
| NumPy | Vectorized array operations for performance |
| Pandas | Data analysis and processing |
| Pydantic | Runtime type validation and parsing |
| OpenAI | AI-powered goal parsing |
| Multiprocessing | Parallel execution across CPU cores |
Project Structure
simulation/
├── __init__.py
├── main.py # CLI entry point
├── monte_carlo.py # Core Monte Carlo engine
├── models.py # Pydantic data models
├── goal_parser.py # AI goal parsing
├── data_processing.py # Data utilities
├── sensitivity.py # Sensitivity analysis
├── requirements.txt # Python dependencies
└── tests/ # Unit tests
Core Engine: monte_carlo.py
Location:simulation/monte_carlo.py
Architecture
The engine uses a multi-worker parallel architecture:┌─────────────────────────────────────────────────────┐
│ run_monte_carlo() │
│ - Split 100k simulations across N workers │
│ - Generate seeds for reproducibility │
└──────────────┬──────────────────────────────────────┘
│
┌───────┴────────┐
│ multiprocessing │
│ Pool(4) │
└───────┬─────────┘
│
┌─────────┴─────────┐
│ │
┌────▼────┐ ┌────▼────┐
│ Worker 1│ │ Worker 4│
│ 25k sims│ ... │ 25k sims│
└────┬────┘ └────┬────┘
│ │
└─────────┬─────────┘
│
┌────▼─────┐
│ Aggregate │
│ Results │
└───────────┘
Main Simulation Function
def run_monte_carlo(
request: SimulationRequest,
n_workers: int = None,
progress_callback: Optional[Callable[[dict], None]] = None
) -> SimulationResults:
"""
Run full Monte Carlo simulation with parallel workers.
Args:
request: Simulation request with financial profile and goal
n_workers: Number of parallel workers (defaults to CPU count)
progress_callback: Optional callback for progress updates
Returns:
SimulationResults with probability distributions and statistics
"""
params = request.simulation_params or SimulationParams()
n_simulations = params.n_simulations
if n_workers is None:
n_workers = min(cpu_count(), 4)
# Generate seeds for reproducibility
seeds = np.arange(n_simulations)
# Split work across workers
batches = np.array_split(seeds, n_workers)
batch_args = [(request, batch, i) for i, batch in enumerate(batches)]
# Run parallel simulations
if n_workers > 1:
results_list = []
completed = 0
with Pool(n_workers) as pool:
for balances, batch_id in pool.imap_unordered(run_simulation_batch, batch_args):
results_list.append(balances)
completed += len(balances)
if progress_callback:
progress_callback({
"type": "progress",
"completed": int(completed),
"total": int(n_simulations),
"percentage": round(completed / n_simulations * 100, 2)
})
final_balances = np.concatenate(results_list)
else:
final_balances, _ = run_simulation_batch(batch_args[0])
# Calculate statistics
return calculate_results(final_balances, request)
Vectorized Simulation Batch
The core simulation uses NumPy vectorization for massive performance gains:def run_simulation_batch(args: Tuple[SimulationRequest, np.ndarray, int]) -> Tuple[np.ndarray, int]:
"""
Run a batch of Monte Carlo simulations using NumPy vectorization.
This function processes thousands of simulations simultaneously using
vectorized array operations instead of Python loops.
"""
request, seeds, batch_id = args
params = request.simulation_params or SimulationParams()
n_sims = len(seeds)
months = request.goal.timeline_months
# Initialize random state
rng = np.random.default_rng(seeds[0])
# Starting balance
starting_balance = (
request.financial_profile.liquid_assets -
request.financial_profile.credit_debt
)
# Pre-generate ALL random numbers for efficiency
# Shape: (n_sims, months) - this is key to vectorization!
income_noise = rng.normal(1.0, params.income_volatility, (n_sims, months))
spending_noise = rng.normal(1.0, params.expense_volatility, (n_sims, months))
emergency_events = rng.random((n_sims, months)) < params.emergency_probability
emergency_amounts = rng.uniform(
params.emergency_min,
params.emergency_max,
(n_sims, months)
)
# Monthly market returns
monthly_return_mean = params.annual_return_mean / 12
monthly_return_std = params.annual_return_std / np.sqrt(12)
market_returns = rng.normal(monthly_return_mean, monthly_return_std, (n_sims, months))
# Initialize balance array
balances = np.zeros((n_sims, months + 1))
balances[:, 0] = starting_balance
# Track income/spending multipliers for raises/promotions
income_multiplier = np.ones(n_sims)
spending_multiplier = np.ones(n_sims)
# Simulate month-by-month
for month in range(months):
# Apply inflation (compounds monthly)
monthly_inflation = rng.normal(
params.inflation_rate / 12,
params.inflation_volatility / 12,
n_sims
)
spending_multiplier *= (1 + monthly_inflation)
# Apply annual raises (every 12 months)
if month % 12 == 11:
annual_raises = rng.normal(
params.annual_raise_mean,
params.annual_raise_volatility,
n_sims
)
income_multiplier *= (1 + annual_raises)
# Apply semi-annual promotion chances (every 6 months)
if month % 6 == 5:
promotion_events = rng.random(n_sims) < params.promotion_probability
promotion_raises = rng.normal(
params.promotion_raise_mean,
params.promotion_raise_volatility,
n_sims
)
income_multiplier[promotion_events] *= (1 + promotion_raises[promotion_events])
# Calculate cash flows (vectorized across all simulations)
base_income = request.user_inputs.monthly_income
base_spending = request.financial_profile.monthly_spending
income = base_income * income_multiplier * income_noise[:, month]
spending = base_spending * spending_multiplier * spending_noise[:, month]
emergencies = emergency_events[:, month] * emergency_amounts[:, month]
# Investment returns (only on balance above emergency fund)
emergency_fund_target = base_spending * 6
investable_balance = np.maximum(balances[:, month] - emergency_fund_target, 0)
returns = investable_balance * market_returns[:, month]
# Update balances (vectorized!)
balances[:, month + 1] = (
balances[:, month] +
income -
spending -
emergencies -
request.financial_profile.monthly_loan_payments +
returns
)
return balances[:, -1], batch_id # Return final balances
The key to performance is pre-generating all random numbers as 2D arrays
(n_sims, months) and using vectorized operations instead of nested loops.Performance Benchmarks
100,000 simulations over 36 months:| Workers | Time | Sims/Second |
|---|---|---|
| 1 | 8.2s | 12,195 |
| 2 | 4.5s | 22,222 |
| 4 | 2.3s | 43,478 |
Account-Aware Simulation
The engine supports detailed per-account modeling:Credit Card Interest Modeling
if params.use_account_aware_simulation and params.credit_cards:
# Initialize per-card balances
n_cards = len(params.credit_cards)
card_balances = np.zeros((n_sims, n_cards))
card_aprs = np.zeros(n_cards)
for i, card in enumerate(params.credit_cards):
card_balances[:, i] = card.balance
card_aprs[i] = card.apr / 100.0 # Convert to decimal
# During each month:
for i in range(n_cards):
# Monthly interest
monthly_rate = card_aprs[i] / 12
interest = card_balances[:, i] * monthly_rate
# Add interest to balance
card_balances[:, i] += interest
# Minimum payment
payment = np.minimum(card_min_payments[i], card_balances[:, i])
card_balances[:, i] -= payment
# Deduct from cash flow
balances[:, month + 1] -= (interest + payment)
Loan Amortization
if params.loans:
for i, loan in enumerate(params.loans):
# Monthly interest
monthly_rate = loan.interest_rate / 100.0 / 12
interest = loan_balances[:, i] * monthly_rate
# Fixed payment covers interest + principal
payment = np.minimum(loan.monthly_payment, loan_balances[:, i] + interest)
principal_payment = np.maximum(0, payment - interest)
# Update loan balance
loan_balances[:, i] = np.maximum(0, loan_balances[:, i] - principal_payment)
# Deduct payment from cash flow
balances[:, month + 1] -= payment
Pydantic Models
Location:simulation/models.py
Core Models
from pydantic import BaseModel, field_validator
from typing import Optional, Dict, List, Literal
class FinancialProfile(BaseModel):
liquid_assets: float
credit_debt: float = 0.0
loan_debt: float = 0.0
monthly_loan_payments: float = 0.0
monthly_spending: float
spending_by_category: Dict[str, float] = {}
spending_volatility: float = 0.15
class UserInputs(BaseModel):
monthly_income: float
age: int
risk_tolerance: Literal["low", "medium", "high"]
class Goal(BaseModel):
target_amount: float
timeline_months: int
goal_type: str = "custom"
goal_text: Optional[str] = None
confidence: float = 1.0
@field_validator('timeline_months')
@classmethod
def validate_timeline(cls, v):
if v <= 0:
raise ValueError(f'timeline_months must be positive, got {v}')
return v
class SimulationParams(BaseModel):
n_simulations: int = 100000
income_volatility: float = 0.05
expense_volatility: float = 0.15
emergency_probability: float = 0.08
emergency_min: float = 500
emergency_max: float = 3000
annual_return_mean: float = 0.07
annual_return_std: float = 0.15
inflation_rate: float = 0.025
inflation_volatility: float = 0.01
annual_raise_mean: float = 0.03
annual_raise_volatility: float = 0.015
promotion_probability: float = 0.15
promotion_raise_mean: float = 0.08
promotion_raise_volatility: float = 0.03
# Account-aware simulation
credit_cards: List[CreditCardParams] = []
loans: List[LoanParams] = []
use_account_aware_simulation: bool = False
@staticmethod
def from_risk_tolerance(
risk_tolerance: Literal["low", "medium", "high"],
base_params: Optional['SimulationParams'] = None
) -> 'SimulationParams':
"""Adjust return expectations based on risk tolerance."""
params = base_params or SimulationParams()
risk_profiles = {
"low": {"annual_return_mean": 0.04, "annual_return_std": 0.08},
"medium": {"annual_return_mean": 0.07, "annual_return_std": 0.15},
"high": {"annual_return_mean": 0.10, "annual_return_std": 0.20},
}
profile = risk_profiles[risk_tolerance]
params.annual_return_mean = profile["annual_return_mean"]
params.annual_return_std = profile["annual_return_std"]
return params
class SimulationRequest(BaseModel):
financial_profile: FinancialProfile
user_inputs: UserInputs
goal: Goal
simulation_params: Optional[SimulationParams] = None
class Percentiles(BaseModel):
p10: float
p25: float
p50: float
p75: float
p90: float
class Assumptions(BaseModel):
"""Simulation assumptions for transparency."""
annual_return_mean: float
annual_return_std: float
inflation_rate: float
inflation_volatility: float
annual_raise_mean: float
annual_raise_frequency: str
promotion_probability_semi_annual: float
promotion_raise_mean: float
emergency_probability_monthly: float
emergency_amount_range: str
income_volatility: float
expense_volatility: float
class SimulationResults(BaseModel):
success_probability: float
median_outcome: float
percentiles: Percentiles
mean: float
std: float
worst_case: float
best_case: float
simulations_run: int
workers_used: int
assumptions: Optional[Assumptions] = None
CLI Entry Point
Location:simulation/main.py
#!/usr/bin/env python3
import argparse
import json
import sys
from models import SimulationRequest
from monte_carlo import run_monte_carlo
from sensitivity import run_sensitivity_analysis
def main():
parser = argparse.ArgumentParser(description='Monte Carlo Financial Simulation')
parser.add_argument('--mode', choices=['simulate', 'sensitivity', 'benchmark'],
default='simulate')
parser.add_argument('--input', type=str, help='JSON input string')
parser.add_argument('--workers', type=int, default=4)
parser.add_argument('--output-format', choices=['json', 'pretty'], default='json')
args = parser.parse_args()
# Read input
if args.input:
input_json = args.input
else:
input_json = sys.stdin.read()
# Parse request
request = parse_request_from_json(input_json)
# Execute
if args.mode == 'simulate':
results = run_monte_carlo(request, n_workers=args.workers)
output = results_to_camel_case(results)
elif args.mode == 'sensitivity':
results = run_sensitivity_analysis(request)
output = results_to_camel_case(results)
# Output
print(json.dumps(output, indent=2 if args.output_format == 'pretty' else None))
if __name__ == '__main__':
main()
Usage Examples
From Command Line:python main.py --mode simulate --input '{"financialProfile": {...}, "goal": {...}}'
const { spawn } = require('child_process')
const process = spawn('python3', [
'main.py',
'--mode', 'simulate',
'--input', JSON.stringify(request)
])
process.stdout.on('data', (data) => {
const results = JSON.parse(data.toString())
console.log(results)
})
Sensitivity Analysis
Location:simulation/sensitivity.py
def run_sensitivity_analysis(request: SimulationRequest) -> SensitivityAnalysis:
"""
Test impact of parameter changes on success probability.
"""
# Run base simulation
base_results = run_monte_carlo(request)
base_prob = base_results.success_probability
sensitivities = {}
# Test income +10%
modified_request = copy.deepcopy(request)
modified_request.user_inputs.monthly_income *= 1.1
results = run_monte_carlo(modified_request)
sensitivities['income_plus_10'] = {
'delta': results.success_probability - base_prob,
'new_probability': results.success_probability,
'impact': abs(results.success_probability - base_prob)
}
# Test spending -10%
modified_request = copy.deepcopy(request)
modified_request.financial_profile.monthly_spending *= 0.9
results = run_monte_carlo(modified_request)
sensitivities['spending_minus_10'] = {
'delta': results.success_probability - base_prob,
'new_probability': results.success_probability,
'impact': abs(results.success_probability - base_prob)
}
# ... more scenarios
# Find most impactful
most_impactful = max(sensitivities.items(), key=lambda x: x[1]['impact'])[0]
# Generate recommendations
recommendations = generate_recommendations(sensitivities, base_prob)
return SensitivityAnalysis(
base_probability=base_prob,
sensitivities=sensitivities,
most_impactful=most_impactful,
recommendations=recommendations
)
AI Goal Parsing
Location:simulation/goal_parser.py
Uses OpenAI to parse natural language goals:
import openai
from models import Goal
def parse_goal_from_text(goal_text: str) -> Goal:
"""
Parse natural language goal into structured Goal object.
Example:
"Save $50,000 for a down payment in 3 years"
-> Goal(target_amount=50000, timeline_months=36, goal_type="home_purchase")
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{
"role": "system",
"content": "Extract financial goal details. Return JSON with targetAmount, timelineMonths, goalType."
}, {
"role": "user",
"content": goal_text
}]
)
parsed = json.loads(response.choices[0].message.content)
return Goal(
target_amount=parsed['targetAmount'],
timeline_months=parsed['timelineMonths'],
goal_type=parsed.get('goalType', 'custom'),
goal_text=goal_text,
confidence=parsed.get('confidence', 0.9)
)
Key Performance Techniques
1. NumPy Vectorization
Bad (Python loops):for sim in range(n_sims):
for month in range(months):
income = base_income * (1 + random.gauss(0, 0.05))
# ... process one month at a time
# Generate all random numbers at once
income_noise = rng.normal(1.0, 0.05, (n_sims, months))
# Apply to all simulations simultaneously
income = base_income * income_noise # Operates on entire array
2. Multiprocessing
with Pool(n_workers) as pool:
results = pool.imap_unordered(run_simulation_batch, batch_args)
final_balances = np.concatenate(list(results))
3. Pre-allocation
# Pre-allocate arrays
balances = np.zeros((n_sims, months + 1))
income_multiplier = np.ones(n_sims)
# Faster than growing arrays dynamically
Installation
cd simulation
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
Dependencies
File:requirements.txt
numpy>=1.24.0
pandas>=2.0.0
pydantic>=2.0.0
openai>=1.0.0
python-dotenv>=1.0.0
Testing
pytest tests/
import pytest
from monte_carlo import run_monte_carlo
from models import SimulationRequest, FinancialProfile, UserInputs, Goal
def test_simulation_convergence():
"""Test that 100k simulations produce stable results."""
request = SimulationRequest(
financial_profile=FinancialProfile(
liquid_assets=50000,
credit_debt=0,
monthly_spending=3000
),
user_inputs=UserInputs(
monthly_income=5000,
age=30,
risk_tolerance="medium"
),
goal=Goal(
target_amount=100000,
timeline_months=60
)
)
results = run_monte_carlo(request, n_workers=4)
assert 0 <= results.success_probability <= 1
assert results.percentiles.p50 < results.percentiles.p75
assert results.worst_case < results.best_case
Next Steps
Backend API
See how the Express API calls the Python engine
Architecture Overview
Return to the high-level architecture