def calculate_urgency(self, item: dict) -> float: """ Calculate urgency score for an item. Urgency = (time since last update) / (target polling rate) Returns 0.0 if item is in cooldown. """ # If in backoff cooldown, urgency is 0 (never urgent) if item.get('skip_until') and datetime.now() < item['skip_until']: return 0.0 if item['last_update'] is None: return float('inf') # Never updated = infinitely urgent delta = datetime.now() - item['last_update'] urgency = delta.total_seconds() / item['polling-interval-in-seconds'] return urgency
Urgency score interpretation:
Score
Meaning
Action
∞
Never executed
Execute immediately
>= 1.0
Overdue (deadline passed)
Execute now
0.5
Halfway to deadline
Wait
0.0
Just updated OR in cooldown
Wait
Example:
item = { 'market_hash_name': 'AK-47 | Redline (FT)', 'polling-interval-in-seconds': 60, # Target: every 60 seconds 'last_update': datetime(2024, 3, 3, 14, 0, 0) # Last updated at 14:00:00}
async def run(self) -> None: """ Main scheduler loop using urgency-based algorithm. Algorithm: 1. Calculate urgency for all items 2. If max_urgency >= 1.0, execute that item and loop 3. If max_urgency < 1.0, sleep until next item is overdue 4. Repeat forever """ async with SteamAPIClient(rate_limiter=self.rate_limiter) as client, SQLinserts() as wizard: self.steam_client = client self.data_wizard = wizard while True: # Execute ALL items that are overdue (urgency >= 1.0) executed_any = False for item in self.live_items: urgency = self.calculate_urgency(item) if urgency >= 1.0: await self.execute_item(item) executed_any = True # If nothing was urgent, sleep until the next item becomes urgent if not executed_any: sleep_duration = self.calculate_min_sleep_duration() await asyncio.sleep(sleep_duration)
Key insight: The loop executes all overdue items before sleeping, not just the most urgent one.
The scheduler calculates the minimum time until any item becomes actionable:
snoozerScheduler.py:96-127
def calculate_min_sleep_duration(self) -> float: """ Calculate MINIMUM sleep time until ANY item becomes actionable. Checks all items and returns the shortest time until any item: - Reaches urgency 1.0 (overdue), OR - Exits 429 cooldown (skip_until reached) """ min_sleep = float('inf') for item in self.live_items: # Check if item is in 429 cooldown if item.get('skip_until') and datetime.now() < item['skip_until']: # Time until cooldown ends time_until_cooldown_ends = (item['skip_until'] - datetime.now()).total_seconds() min_sleep = min(min_sleep, time_until_cooldown_ends) else: # Normal urgency calculation urgency = self.calculate_urgency(item) if urgency < 1.0: # Only consider items that aren't already overdue # Time until this item becomes urgent (urgency = 1.0) time_until_urgent = (1.0 - urgency) * item['polling-interval-in-seconds'] min_sleep = min(min_sleep, time_until_urgent) # If all items are overdue, don't sleep return min_sleep if min_sleep != float('inf') else 0
async def execute_item(self, item: dict) -> None: # Check cooldown if item.get('skip_until') and datetime.now() < item['skip_until']: return # Silently skip try: # Route to correct API endpoint match item['apiid']: case 'priceoverview': result = await self.steam_client.fetch_price_overview(...) case 'itemordershistogram': result = await self.steam_client.fetch_orders_histogram(...) case 'itemordersactivity': result = await self.steam_client.fetch_orders_activity(...) # Store to database await self.data_wizard.store_data(result, item) # SUCCESS: Reset backoff, update timestamp item['consecutive_backoffs'] = 0 item['skip_until'] = None item['last_update'] = datetime.now() except aiohttp.ClientResponseError as e: if e.status == 429 or e.status >= 500: # Transient error - exponential backoff self.apply_exponential_backoff(item, e.status)
Rate limiting happens transparently in steam_client.fetch_*() via await rate_limiter.acquire_token().The scheduler is unaware of rate limiting delays - it just sees slower API calls.
def get_next_execution_time(self) -> datetime: """ Calculate the next execution time (:30 past the next hour). Returns: Datetime of next execution (next hour at :30 UTC) """ # Get current UTC time now = datetime.now(timezone.utc) # Start with :30 of current hour next_run = now.replace(minute=30, second=0, microsecond=0) # If we're past :30, move to next hour if now.minute >= 30: next_run = next_run + timedelta(hours=1) return next_run
Example calculations:
next_run = 14:30 UTC # Same hour, not past :30 yetsleep = 15 minutes
async def run(self) -> None: """ Main clockwork loop. Algorithm: 1. Run pricehistory immediately on startup 2. Calculate next :30 past the hour 3. Sleep until that time 4. Execute all pricehistory items 5. Repeat from step 2 """ async with SteamAPIClient(rate_limiter=self.rate_limiter) as client, SQLinserts() as wizard: self.steam_client = client self.data_wizard = wizard # Run once immediately await self.run_initial_fetch() while True: next_execution = self.get_next_execution_time() sleep_seconds = self.calculate_sleep_duration(next_execution) print(f" Historical collector sleeping until {next_execution.strftime('%H:%M:%S')} UTC ({sleep_seconds:.0f} seconds)") await asyncio.sleep(sleep_seconds) await self.execute_history_items()
Key behaviors:
Why run immediately on startup?
# Run once immediatelyawait self.run_initial_fetch()
Ensures you have historical data right away instead of waiting up to 60 minutes for the first scheduled run.Example: Start at 14:45 UTC
Without initial fetch: wait until 15:30 (45 min)
With initial fetch: data available in ~10 seconds
Why sleep instead of polling?
Historical data changes once per hour. Polling every minute would:
Waste 59 API calls per hour (98% wasted)
Increase 429 risk
Delay other endpoints (shared rate limiter)
Sleeping is more efficient and respectful of API limits.
async def _fetch_item_with_retry(self, item: dict, max_retries: int = 4) -> None: """ Fetch price history for a single item with retry logic. Retries transient errors (429, 5xx, network) and auth errors (400, 401, 403) with backoff. Auth errors are retried because cookies can be hot-swapped in .env. """ backoff_seconds = [30, 60, 120, 240] # Fixed delays for each retry for attempt in range(max_retries + 1): try: result = await self.steam_client.fetch_price_history(...) await self.data_wizard.store_data(result, item) item['last_update'] = datetime.now() return # Success except aiohttp.ClientResponseError as e: if e.status == 429 or e.status >= 500: # Transient error - retry if attempt < max_retries: delay = backoff_seconds[attempt] print(f" ⏸ {item['market_hash_name']}: {error_type} - retrying in {delay}s (attempt {attempt + 1}/{max_retries})") await asyncio.sleep(delay) elif e.status in (400, 401, 403): # Auth error - retry (cookies can be hot-swapped) if attempt < max_retries: delay = backoff_seconds[attempt] print(f" ⏸ {item['market_hash_name']}: HTTP {e.status} (cookie error?) - update .env, retrying in {delay}s") await asyncio.sleep(delay)
Retry schedule:
Attempt
Wait Before Retry
Cumulative Time
1st (initial)
0s
0s
2nd
30s
30s
3rd
60s
90s
4th
120s
210s
5th (final)
240s
450s (7.5 min)
Why retry auth errors (400, 401, 403)?
# Steam cookies can be hot-swapped while the system runs:# 1. User updates .env file with new cookies# 2. Next pricehistory call reads fresh cookies (steamAPIclient.py:198)# 3. Retry succeeds with new credentials
This enables zero-downtime cookie rotation for long-running deployments.
All items have last_update = None, so urgency = ∞.Example: 50 items configured
All 50 attempt to execute immediately
Rate limiter serializes them (e.g., 100 req/5min = 1 req every 3s)
Takes ~2.5 minutes to complete initial sweep
After startup, items naturally spread based on their polling intervals.
snoozerScheduler: Steady-State Spreading
Items with different polling intervals naturally desynchronize:
Item A (30s): Execute at t=0, t=30, t=60, t=90, ...Item B (60s): Execute at t=0, t=60, t=120, t=180, ...Item C (45s): Execute at t=0, t=45, t=90, t=135, ...Result: Requests spread across time (0, 30, 45, 60, 90, ...)
This is more efficient than fixed scheduling which can cause burst collisions.
ClockworkScheduler: Predictable Load
All history items execute at the same time (:30 past hour).Example: 10 pricehistory items
All execute between 14:30:00 and 14:30:30
Rate limiter queues them (e.g., 1 req every 3s)
Completes in ~30 seconds
The burst is acceptable because it happens once per hour (low frequency).
Important:polling-interval-in-seconds is ignored by ClockworkScheduler.History items always execute at :30 past the hour, regardless of configured interval. The field is still required for config validation.