Skip to main content

Overview

The backfill system ensures Junkie has access to historical messages for every channel. It automatically fetches and caches message history when the bot starts, enabling context-aware conversations even after restarts.

Backfill Strategy

The system uses a two-phase approach:
  1. Catch-up Phase: Fetch newer messages (after latest cached message)
  2. Backfill Phase: Fetch older messages (before oldest cached message)
This ensures both recent updates and deep history are captured.

Core Function

The backfill_channel function orchestrates the entire backfill process:
async def backfill_channel(channel, target_limit: int = CONTEXT_AGENT_MAX_MESSAGES):
    """
    Backfill message history for a channel if DB count is low.
    Thread-safe with per-channel locking.
    """
    channel_id = channel.id
    
    # Acquire per-channel lock to prevent race conditions
    if channel_id not in _backfill_locks:
        _backfill_locks[channel_id] = asyncio.Lock()
    
    async with _backfill_locks[channel_id]:
        try:
            current_count = await get_message_count(channel_id)
            channel_name = getattr(channel, "name", "DM")
            
            # If we have enough messages (e.g. > 90% of target), skip backfill
            if current_count >= target_limit * 0.9:
                logger.info(f"[Backfill] ✓ Channel {channel_name}: {current_count}/{target_limit} messages (≥90%). Skipping backfill.")
                return
Source: discord_bot/backfill.py:14-33

Thread Safety

Per-channel locks prevent race conditions during concurrent backfill:
# Per-channel locks to prevent race conditions during concurrent backfill
_backfill_locks = {}

# Acquire per-channel lock to prevent race conditions
if channel_id not in _backfill_locks:
    _backfill_locks[channel_id] = asyncio.Lock()

async with _backfill_locks[channel_id]:
    # Backfill logic...
Source: discord_bot/backfill.py:11-25

Phase 1: Catch-Up

The catch-up phase fetches messages newer than the latest cached message:
if latest_id:
    # 1. Catch Up: Fetch messages newer than the latest stored message
    logger.info(f"[Backfill] ↑ Catching up {channel_name} (after ID {latest_id})...")
    try:
        after_obj = discord.Object(id=latest_id)
        new_messages = await fetch_and_cache_from_api(channel, limit=target_limit, after_message=after_obj)
        fetched_count += len(new_messages)
        logger.info(f"[Backfill] ✓ Caught up {len(new_messages)} new messages. Total: {current_count + len(new_messages)}/{target_limit}")
    except Exception as e:
        logger.error(f"[Backfill] Error catching up: {e}")

    # Re-check count after catch-up
    current_count = await get_message_count(channel_id)
    oldest_id = await get_oldest_message_id(channel_id)  # Update oldest_id
Source: discord_bot/backfill.py:43-56

Why Catch Up First?

  • Ensures recent messages are available immediately
  • More important for active conversations
  • Newer messages have higher chance of being relevant

Phase 2: Initial Fetch

If no data exists, perform a full initial fetch:
else:
    # No data, full fetch
    logger.info(f"[Backfill] ⚡ No existing data for {channel_name}. Performing initial fetch...")
    fetched_count = len(await fetch_and_cache_from_api(channel, limit=target_limit))
    current_count = await get_message_count(channel_id)
    oldest_id = await get_oldest_message_id(channel_id)  # Update oldest_id after fetch
    
    # Only mark as fully backfilled if we fetched ZERO messages (reached end of history)
    if fetched_count == 0:
        logger.info(f"[Backfill] No messages fetched for {channel_name}. Marking as fully backfilled.")
        await mark_channel_fully_backfilled(channel_id, True)
Source: discord_bot/backfill.py:57-69

Phase 3: Deepen (Iterative Backfill)

The deepening phase iteratively fetches older messages until the target is reached:
# 2. Deepen: If still below target, fetch older messages iteratively with parallel batching
max_deepen_iterations = int(os.getenv("BACKFILL_MAX_ITERATIONS", "10"))
parallel_batches = int(os.getenv("BACKFILL_PARALLEL_BATCHES", "3"))  # Fetch N batches at once
deepen_iteration = 0

while current_count < target_limit and deepen_iteration < max_deepen_iterations:
    # Check if we already fully backfilled this channel
    if await is_channel_fully_backfilled(channel_id):
        logger.info(f"[Backfill] Channel {channel_name} is marked as fully backfilled. Stopping deepen.")
        break
    
    if not oldest_id:
        # Update oldest_id in case it wasn't set
        oldest_id = await get_oldest_message_id(channel_id)
        if not oldest_id:
            logger.warning(f"[Backfill] No oldest_id found for {channel_name}, cannot deepen further.")
            break
    
    needed = target_limit - current_count
    logger.info(f"[Backfill] ↓ {channel_name} iteration {deepen_iteration + 1}: {current_count}/{target_limit} (need {needed} more)")
Source: discord_bot/backfill.py:71-91

Iterative Strategy

The system fetches in batches to handle large histories:
try:
    # Determine how many parallel batches to fetch
    batch_size = min(needed // parallel_batches, 500) if needed > parallel_batches else needed
    num_batches = min(parallel_batches, (needed + batch_size - 1) // batch_size)
    
    # Create parallel fetch tasks
    async def fetch_batch(batch_oldest_id, batch_limit):
        """Fetch a single batch of messages."""
        try:
            before_obj = discord.Object(id=batch_oldest_id)
            return await fetch_and_cache_from_api(channel, limit=batch_limit, before_message=before_obj)
        except Exception as e:
            logger.error(f"[Backfill] Error in parallel batch: {e}")
            return []
    
    # First batch uses current oldest_id
    tasks = [fetch_batch(oldest_id, batch_size)]
    
    # Execute first batch
    results = await asyncio.gather(*tasks, return_exceptions=True)
Source: discord_bot/backfill.py:94-112

Completion Detection

The system detects when no more history exists:
total_fetched = 0
for result in results:
    if isinstance(result, list):
        total_fetched += len(result)

logger.info(f"[Backfill]   → Fetched {total_fetched} older messages")

# If we fetched 0 messages, we've reached the beginning
if total_fetched == 0:
    logger.info(f"[Backfill] No older messages found for {channel_name}. Marking as fully backfilled.")
    await mark_channel_fully_backfilled(channel_id, True)
    break
Source: discord_bot/backfill.py:119-130

Progress Tracking

# Update counters for next iteration
prev_count = current_count
current_count = await get_message_count(channel_id)
oldest_id = await get_oldest_message_id(channel_id)
deepen_iteration += 1

progress_pct = int((current_count / target_limit) * 100)
logger.info(f"[Backfill]   ✓ Progress: {current_count}/{target_limit} ({progress_pct}%)")

# Small delay to avoid hammering the API
if deepen_iteration < max_deepen_iterations and current_count < target_limit:
    await asyncio.sleep(0.5)
Source: discord_bot/backfill.py:133-143

Multi-Channel Backfill

The start_backfill_task function coordinates backfill across all channels:
async def start_backfill_task(channels):
    """
    Start background backfill for a list of channels with concurrency control.
    Handles failures gracefully without cancelling other tasks.
    """
    # Default to 2 concurrent channels to be safe with rate limits
    concurrency = int(os.getenv("BACKFILL_CONCURRENCY", "2"))
    sem = asyncio.Semaphore(concurrency)
    
    logger.info(f"[Backfill] Starting background backfill for {len(channels)} channels with concurrency {concurrency}.")
Source: discord_bot/backfill.py:156-165

Concurrency Control

Semaphores limit concurrent operations to respect Discord rate limits:
async def bound_backfill(channel):
    async with sem:
        try:
            await backfill_channel(channel)
        except Exception as e:
            channel_name = getattr(channel, "name", "DM")
            logger.error(f"[Backfill] Failed for channel {channel_name} ({channel.id}): {e}", exc_info=True)
        finally:
            # Small sleep to be nice to API even with semaphore
            await asyncio.sleep(1)

# Create tasks for all channels
tasks = [bound_backfill(c) for c in channels]

# Use return_exceptions=True to prevent one failure from cancelling all others
results = await asyncio.gather(*tasks, return_exceptions=True)
Source: discord_bot/backfill.py:167-182

Summary Reporting

# Log summary
errors = [r for r in results if isinstance(r, Exception)]
successes = len(results) - len(errors)
logger.info(f"[Backfill] ═══════════════════════════════════════")
logger.info(f"[Backfill] Summary: {successes}/{len(channels)} channels successful, {len(errors)} failed")
logger.info(f"[Backfill] ═══════════════════════════════════════")
Source: discord_bot/backfill.py:184-189

Configuration

Environment Variables

# Backfill behavior
BACKFILL_CONCURRENCY=2              # Concurrent channels (default: 2)
BACKFILL_MAX_ITERATIONS=10          # Max deepen iterations (default: 10)
BACKFILL_PARALLEL_BATCHES=3         # Parallel batches per iteration (default: 3)
BACKFILL_MAX_FETCH_LIMIT=1000       # Max messages per fetch (default: 1000)

# Post-backfill sync
MESSAGE_SYNC_LIMIT=200              # Messages to sync after backfill (default: 200)

Key Parameters

  • BACKFILL_CONCURRENCY: Number of channels to backfill simultaneously
  • BACKFILL_MAX_ITERATIONS: Maximum deepen loops per channel
  • BACKFILL_PARALLEL_BATCHES: Planned parallel batch support (currently sequential)
  • BACKFILL_MAX_FETCH_LIMIT: Safety cap on fetch size

Integration with Bot Startup

Backfill runs automatically when the bot starts:
@bot.event
async def on_ready():
    # ... initialization ...
    
    # Start Backfill Task
    text_channels = [
        c for c in bot.bot.get_all_channels() 
        if isinstance(c, (discord.TextChannel, discord.DMChannel, discord.GroupChannel))
    ]
    
    logger.info(f"[on_ready] Found {len(text_channels)} channels to backfill")
    
    # Start backfill task with error handling and post-sync
    async def run_backfill_and_sync():
        try:
            logger.info("[on_ready] Starting backfill task...")
            await start_backfill_task(text_channels)
            logger.info("[on_ready] Backfill task completed")
            
            # Sync recent messages to catch offline edits/deletes
            from discord_bot.message_sync import sync_all_channels
            import os
            sync_limit = int(os.getenv("MESSAGE_SYNC_LIMIT", "200"))
            logger.info(f"[on_ready] Starting post-backfill message sync (last {sync_limit} messages)...")
            await sync_all_channels(text_channels, sync_limit=sync_limit)
            logger.info("[on_ready] Message sync completed")
            
        except Exception as e:
            logger.error(f"[on_ready] Backfill/sync task failed: {e}", exc_info=True)
    
    asyncio.create_task(run_backfill_and_sync())
Source: discord_bot/chat_handler.py:63-95

Performance Optimizations

Skip Fully Cached Channels

Channels with sufficient messages are skipped:
# If we have enough messages (e.g. > 90% of target), skip backfill
if current_count >= target_limit * 0.9:
    logger.info(f"[Backfill] ✓ Channel {channel_name}: {current_count}/{target_limit} messages (≥90%). Skipping backfill.")
    return
Source: discord_bot/backfill.py:31-33

Rate Limit Protection

# Small delay to avoid hammering the API
if deepen_iteration < max_deepen_iterations and current_count < target_limit:
    await asyncio.sleep(0.5)
Source: discord_bot/backfill.py:142-143

Graceful Failure Handling

# Use return_exceptions=True to prevent one failure from cancelling all others
results = await asyncio.gather(*tasks, return_exceptions=True)
Source: discord_bot/backfill.py:182

Logging

The backfill system provides detailed logging:
  • Channel selection: Found {n} channels to backfill
  • Catch-up phase: ↑ Catching up {channel} (after ID {id})
  • Initial fetch: ⚡ No existing data for {channel}. Performing initial fetch...
  • Deepen phase: ↓ {channel} iteration {n}: {current}/{target} (need {needed} more)
  • Progress: ✓ Progress: {current}/{target} ({pct}%)
  • Completion: ✓ Completed {channel}: {count}/{target} ({pct}%) - Fetched {n} messages
  • Summary: Summary: {successes}/{total} channels successful, {errors} failed

Best Practices

  1. Target Limit: Set CONTEXT_AGENT_MAX_MESSAGES to match your context needs
    • Too low: Limited conversation context
    • Too high: Slower backfill, higher database usage
  2. Concurrency: Adjust BACKFILL_CONCURRENCY based on bot load
    • Higher values: Faster backfill
    • Lower values: Better rate limit compliance
  3. Monitoring: Watch backfill logs on startup to detect issues
    • Look for repeated failures on specific channels
    • Check for rate limit errors
  4. Database Performance: Ensure PostgreSQL is optimized
    • Index on channel_id and message_id
    • Regular VACUUM operations

Post-Backfill Sync

After backfill completes, a sync operation catches offline edits/deletions:
# Sync recent messages to catch offline edits/deletes
from discord_bot.message_sync import sync_all_channels
sync_limit = int(os.getenv("MESSAGE_SYNC_LIMIT", "200"))
await sync_all_channels(text_channels, sync_limit=sync_limit)
This ensures the cache reflects any changes that occurred while the bot was offline.

Build docs developers (and LLMs) love