Skip to main content

Overview

The Orchestrator class (cerebro.py) is the backend entry point for the CS2 Market Tracker. It acts as a “choir conductor,” ensuring all schedulers work in harmony without exceeding Steam’s API rate limits.

Class Definition

cerebro.py:21-41
class Orchestrator:
    """
    Coordinates all schedulers with shared rate limiting.

    Think of it as a choir conductor - ensures all parts (schedulers)
    work in harmony without exceeding Steam's API rate limits.
    """

    def __init__(self, config_path: str = "config.yaml"):
        """
        Initialize orchestrator with configuration.

        Args:
            config_path: Path to YAML configuration file
        """
        self.config_path = config_path
        self.config: Optional[dict] = None
        self.rate_limiter: Optional[RateLimiter] = None
        self.snoozerScheduler: Optional[snoozerScheduler] = None
        self.clockworkScheduler: Optional[ClockworkScheduler] = None
        self.shutdown_event = asyncio.Event()

Lifecycle Phases

1. Configuration Loading & Validation

The orchestrator performs comprehensive validation before starting:
cerebro.py:43-58
def load_and_validate_config(self):
    """Load config and validate feasibility."""
    print("hello world!")
    self.config = load_config_from_yaml(self.config_path)

    # Extract config values
    rate_limit = self.config['LIMITS']['REQUESTS']
    window_seconds = self.config['LIMITS']['WINDOW_SECONDS']
    tracking_items = self.config['TRACKING_ITEMS']

    print(f"  I see you have a rate limit: {rate_limit} requests per {window_seconds} seconds")

    # Validate required fields exist before checking feasibility
    self.validate_required_fields(tracking_items)

    self.validate_config_feasibility(rate_limit, window_seconds, tracking_items)

Required Field Validation

Each item must have specific fields based on its endpoint type:
cerebro.py:60-110
def validate_required_fields(self, items: list):
    """
    Validate that each item has all required fields.
    
    Required fields:
    - All items: market_hash_name, apiid, polling-interval-in-seconds, appid
    - histogram/activity: item_nameid (additional)
    """
    valid_apiids = {'priceoverview', 'itemordershistogram', 'itemordersactivity', 'pricehistory'}
    
    # Popular Steam app IDs for helpful error messages
    popular_appids = {
        730: "Counter-Strike 2 (CS2)",
        570: "Dota 2",
        440: "Team Fortress 2",
        252490: "Rust",
        753: "Steam (trading cards, backgrounds, emoticons)"
    }
Validation rules:
1

Universal Fields

All items require: market_hash_name, apiid, polling-interval-in-seconds, appid
2

Endpoint-Specific Fields

itemordershistogram and itemordersactivity additionally require: item_nameid
3

Valid API IDs

Only these values are accepted: priceoverview, itemordershistogram, itemordersactivity, pricehistory
Example error output:
 CONFIG ERROR: Item 3 missing required field 'item_nameid'
   Item: AK-47 | Redline (Field-Tested)

Feasibility Validation

The orchestrator calculates whether the configuration can satisfy all polling requirements:
cerebro.py:112-139
def validate_config_feasibility(self, rate_limit: int, window_seconds: int, items: list):
    """
    Validate that config is feasible given rate limits.

    Calculates maximum requests per window assuming worst-case (all items synchronized).
    Real usage will typically be lower due to urgency-based scheduling spreading requests.
    """
    total_reqs = 0

    for item in items:
        reqs_per_window = window_seconds // item['polling-interval-in-seconds']
        total_reqs += reqs_per_window

    if total_reqs > rate_limit:
        print(f"\n❌ CONFIG ERROR: Infeasible configuration")
        print(f"   Calculated: {total_reqs} requests per {window_seconds}s")
        print(f"   Limit: {rate_limit} requests per {window_seconds}s")
        print(f"   Adjust polling intervals or reduce tracked items")
        exit(1)

    # Success - config is feasible
    utilization = (total_reqs / rate_limit) * 100
    print(f"  ✓ Config feasible: {total_reqs} req/{window_seconds}s ({utilization:.1f}% capacity)")

    # Warn about startup burst
    if len(items) > rate_limit:
        print(f"  ⚠ Startup: {len(items)} items will fire initially (rate limiter will queue them)")
Calculation logic:
# Config:
LIMITS:
  REQUESTS: 100
  WINDOW_SECONDS: 300  # 5 minutes

TRACKING_ITEMS:
  - polling-interval-in-seconds: 60   # 300/60 = 5 req/window
  - polling-interval-in-seconds: 120  # 300/120 = 2.5 = 2 req/window
  - polling-interval-in-seconds: 30   # 300/30 = 10 req/window

# Total: 5 + 2 + 10 = 17 requests per 300s window
# Utilization: 17/100 = 17%
This is a worst-case calculation assuming all items are perfectly synchronized.In practice, the urgency-based scheduler spreads requests naturally over time, resulting in lower peak utilization.

2. Scheduler Setup

After validation, the orchestrator creates schedulers with a shared rate limiter:
cerebro.py:145-179
def setup_schedulers(self):
    """Create scheduler instances with shared rate limiter."""
    print("\n")

    # Create single shared rate limiter (CRITICAL for API compliance)
    rate_limit = self.config['LIMITS']['REQUESTS']
    window_seconds = self.config['LIMITS']['WINDOW_SECONDS']
    self.rate_limiter = RateLimiter(max_requests=rate_limit, window_seconds=window_seconds)
    print(f"  ✓ Shared RateLimiter created ({rate_limit} req/{window_seconds}s)")
    print("  ✓ Database: SQLite at data/market_data.db")

    # Filter items by type: live items (not pricehistory) vs history items
    live_items = []
    history_items = []

    for item in self.config['TRACKING_ITEMS']:
        if item['apiid'] == 'pricehistory':
            history_items.append(item)
        else:
            live_items.append(item)

    # Create schedulers with shared rate limiter
    if live_items:
        self.snoozerScheduler = snoozerScheduler(
            live_items=live_items,
            rate_limiter=self.rate_limiter  # SHARED
        )
        print(f"  ✓ Started HIGH frequency tracking on ({len(live_items)} items)")

    if history_items:
        self.clockworkScheduler = ClockworkScheduler(
            items=history_items,
            rate_limiter=self.rate_limiter  # SHARED
        )
        print(f"  ✓ Started ARCHIVAL work + all known historical snapshots available right now on ({len(history_items)} items)")
Critical Design Decision: Both schedulers share the same RateLimiter instance.This is essential for API compliance - without sharing, each scheduler would independently allow N requests, effectively doubling the rate limit.
Item routing logic:
ConditionSchedulerPurpose
apiid == 'pricehistory'ClockworkSchedulerHistorical data updates hourly
apiid != 'pricehistory'snoozerSchedulerReal-time data (priceoverview, histogram, activity)

3. Signal Handling

Graceful shutdown is critical for data integrity:
cerebro.py:181-194
def setup_signal_handlers(self):
    """Setup signal handlers for graceful shutdown."""
    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(
            sig,
            lambda: asyncio.create_task(self.shutdown())
        )

async def shutdown(self):
    """Handle graceful shutdown."""
    print("\n\nShutdown signal received. Stopping schedulers...")
    self.shutdown_event.set()
Handled signals:
  • SIGINT: Ctrl+C in terminal
  • SIGTERM: Kubernetes/Docker stop, kill command

4. Main Execution Loop

The orchestrator runs both schedulers concurrently:
cerebro.py:196-264
async def run(self):
    """
    Main orchestrator loop.

    Runs all schedulers concurrently until shutdown signal or config change.
    """
    # Load and validate configuration
    self.load_and_validate_config()

    # Setup schedulers with shared rate limiter
    self.setup_schedulers()

    # Setup signal handlers for graceful shutdown
    self.setup_signal_handlers()

    print("\n")
    print("GO TIME!")
    print("\n")
    print(f"Press Ctrl+C to stop")
    print("="*60 + "\n")

    # Create tasks for all schedulers
    tasks = []

    if self.snoozerScheduler:
        tasks.append(asyncio.create_task(
            self.snoozerScheduler.run(),
            name="live"
        ))

    if self.clockworkScheduler:
        tasks.append(asyncio.create_task(
            self.clockworkScheduler.run(),
            name="clockwork"
        ))

    if not tasks:
        print("Warning: No schedulers configured. Exiting.")
        return

    # Run all schedulers concurrently until shutdown
    try:
        # Wait for shutdown event or any task to fail
        shutdown_task = asyncio.create_task(self.shutdown_event.wait(), name="shutdown")
        done, pending = await asyncio.wait(
            tasks + [shutdown_task],
            return_when=asyncio.FIRST_COMPLETED
        )

        # Cancel remaining tasks
        for task in pending:
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass

        # Check if any scheduler task failed
        for task in done:
            if task.get_name() != "shutdown" and not task.cancelled():
                exc = task.exception()
                if exc:
                    print(f"Scheduler {task.get_name()} failed with error: {exc}")
                    raise exc

    except asyncio.CancelledError:
        print("Orchestrator cancelled")

    print("\n✓ All schedulers stopped gracefully")
Execution strategy:
1

Create Tasks

Both schedulers are wrapped in asyncio.create_task() with descriptive names
2

Concurrent Execution

asyncio.wait() runs all tasks simultaneously, returning when any task completes
3

Shutdown Handling

When shutdown event fires or a task fails:
  1. Cancel all pending tasks
  2. Await cancellation (ensures cleanup)
  3. Check for exceptions in completed tasks

Entry Point

cerebro.py:267-280
async def main():
    """Entry point for the backend."""
    orchestrator = Orchestrator(config_path="config.yaml")
    await orchestrator.run()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nKeyboard interrupt received. Exiting...")
    except Exception as e:
        print(f"\nFatal error: {e}")
        raise
Usage:
python cerebro.py

Example Output

hello world!
  I see you have a rate limit: 100 requests per 300 seconds
 Config feasible: 87 req/300s (87.0% capacity)


 Shared RateLimiter created (100 req/300s)
 Database: SQLite at data/market_data.db
 Started HIGH frequency tracking on (15 items)
 Started ARCHIVAL work + all known historical snapshots available right now on (3 items)


GO TIME!

Press Ctrl+C to stop
============================================================

Running initial price history fetch...
[2026-03-03 14:30:00] Executing hourly price history updates
 AK-47 | Redline (Field-Tested): $8.42
 AWP | Asiimov (Field-Tested): 234 orders
  Historical collector sleeping until 15:30:00 UTC (3600 seconds)

Design Principles

The orchestrator only coordinates. It does not:
  • Make API calls
  • Store data
  • Calculate urgency
These concerns are delegated to schedulers and clients.
Configuration errors are caught at startup, not during execution:
# Bad config kills the process immediately
if total_reqs > rate_limit:
    exit(1)
This prevents wasted resources and partial data collection.
The only shared state is the RateLimiter instance. This is intentional:
  • Ensures API compliance
  • Enables centralized rate monitoring
  • Simplifies debugging (single point of truth)

Common Scenarios

The orchestrator detects the failure in asyncio.wait() and:
  1. Cancels the other scheduler
  2. Logs the exception
  3. Re-raises the error (crashing the entire process)
This ensures data consistency - partial operation is worse than no operation.
No. Scheduler creation is coupled to configuration loading.To add items:
  1. Edit config.yaml
  2. Restart the orchestrator (Ctrl+C, then re-run)
Future versions may support hot-reloading.
When you press Ctrl+C:
  1. Signal handler sets shutdown_event
  2. asyncio.wait() returns immediately
  3. Both scheduler tasks are cancelled
  4. Cleanup code in schedulers runs (context managers exit)
  5. Process exits cleanly
In-flight API requests may complete or be abandoned depending on timing.

Rate Limiter

Learn how the shared rate limiter enforces API limits

Schedulers

Deep dive into urgency-based and fixed-interval scheduling

Configuration

Complete configuration reference

Build docs developers (and LLMs) love