The Orchestrator class (cerebro.py) is the backend entry point for the CS2 Market Tracker. It acts as a “choir conductor,” ensuring all schedulers work in harmony without exceeding Steam’s API rate limits.
class Orchestrator: """ Coordinates all schedulers with shared rate limiting. Think of it as a choir conductor - ensures all parts (schedulers) work in harmony without exceeding Steam's API rate limits. """ def __init__(self, config_path: str = "config.yaml"): """ Initialize orchestrator with configuration. Args: config_path: Path to YAML configuration file """ self.config_path = config_path self.config: Optional[dict] = None self.rate_limiter: Optional[RateLimiter] = None self.snoozerScheduler: Optional[snoozerScheduler] = None self.clockworkScheduler: Optional[ClockworkScheduler] = None self.shutdown_event = asyncio.Event()
This is a worst-case calculation assuming all items are perfectly synchronized.In practice, the urgency-based scheduler spreads requests naturally over time, resulting in lower peak utilization.
After validation, the orchestrator creates schedulers with a shared rate limiter:
cerebro.py:145-179
def setup_schedulers(self): """Create scheduler instances with shared rate limiter.""" print("\n") # Create single shared rate limiter (CRITICAL for API compliance) rate_limit = self.config['LIMITS']['REQUESTS'] window_seconds = self.config['LIMITS']['WINDOW_SECONDS'] self.rate_limiter = RateLimiter(max_requests=rate_limit, window_seconds=window_seconds) print(f" ✓ Shared RateLimiter created ({rate_limit} req/{window_seconds}s)") print(" ✓ Database: SQLite at data/market_data.db") # Filter items by type: live items (not pricehistory) vs history items live_items = [] history_items = [] for item in self.config['TRACKING_ITEMS']: if item['apiid'] == 'pricehistory': history_items.append(item) else: live_items.append(item) # Create schedulers with shared rate limiter if live_items: self.snoozerScheduler = snoozerScheduler( live_items=live_items, rate_limiter=self.rate_limiter # SHARED ) print(f" ✓ Started HIGH frequency tracking on ({len(live_items)} items)") if history_items: self.clockworkScheduler = ClockworkScheduler( items=history_items, rate_limiter=self.rate_limiter # SHARED ) print(f" ✓ Started ARCHIVAL work + all known historical snapshots available right now on ({len(history_items)} items)")
Critical Design Decision: Both schedulers share the sameRateLimiter instance.This is essential for API compliance - without sharing, each scheduler would independently allow N requests, effectively doubling the rate limit.
Item routing logic:
Condition
Scheduler
Purpose
apiid == 'pricehistory'
ClockworkScheduler
Historical data updates hourly
apiid != 'pricehistory'
snoozerScheduler
Real-time data (priceoverview, histogram, activity)
The orchestrator runs both schedulers concurrently:
cerebro.py:196-264
async def run(self): """ Main orchestrator loop. Runs all schedulers concurrently until shutdown signal or config change. """ # Load and validate configuration self.load_and_validate_config() # Setup schedulers with shared rate limiter self.setup_schedulers() # Setup signal handlers for graceful shutdown self.setup_signal_handlers() print("\n") print("GO TIME!") print("\n") print(f"Press Ctrl+C to stop") print("="*60 + "\n") # Create tasks for all schedulers tasks = [] if self.snoozerScheduler: tasks.append(asyncio.create_task( self.snoozerScheduler.run(), name="live" )) if self.clockworkScheduler: tasks.append(asyncio.create_task( self.clockworkScheduler.run(), name="clockwork" )) if not tasks: print("Warning: No schedulers configured. Exiting.") return # Run all schedulers concurrently until shutdown try: # Wait for shutdown event or any task to fail shutdown_task = asyncio.create_task(self.shutdown_event.wait(), name="shutdown") done, pending = await asyncio.wait( tasks + [shutdown_task], return_when=asyncio.FIRST_COMPLETED ) # Cancel remaining tasks for task in pending: task.cancel() try: await task except asyncio.CancelledError: pass # Check if any scheduler task failed for task in done: if task.get_name() != "shutdown" and not task.cancelled(): exc = task.exception() if exc: print(f"Scheduler {task.get_name()} failed with error: {exc}") raise exc except asyncio.CancelledError: print("Orchestrator cancelled") print("\n✓ All schedulers stopped gracefully")
Execution strategy:
1
Create Tasks
Both schedulers are wrapped in asyncio.create_task() with descriptive names
2
Concurrent Execution
asyncio.wait() runs all tasks simultaneously, returning when any task completes
hello world! I see you have a rate limit: 100 requests per 300 seconds ✓ Config feasible: 87 req/300s (87.0% capacity) ✓ Shared RateLimiter created (100 req/300s) ✓ Database: SQLite at data/market_data.db ✓ Started HIGH frequency tracking on (15 items) ✓ Started ARCHIVAL work + all known historical snapshots available right now on (3 items)GO TIME!Press Ctrl+C to stop============================================================Running initial price history fetch...[2026-03-03 14:30:00] Executing hourly price history updates ✓ AK-47 | Redline (Field-Tested): $8.42 ✓ AWP | Asiimov (Field-Tested): 234 orders Historical collector sleeping until 15:30:00 UTC (3600 seconds)