Skip to main content

Overview

Junkie uses Agno’s Team orchestration pattern where a leader agent coordinates specialized sub-agents. Each user gets their own team instance cached in memory with LRU eviction.

Team Creation

Teams are created per-user by create_team_for_user() in agent_factory.py:167-329:
def create_team_for_user(user_id: str, client=None):
    """
    Create a full AI Team for a specific user.
    
    Returns:
        tuple: (model, team)
    """
    model = create_model(user_id)
    
    # Create specialized sub-agents
    agents = [perplexity_agent, compound_agent, code_agent, context_qna_agent]
    
    # Create team leader
    team = Team(
        name="Hero Team",
        model=model,
        db=db,
        members=agents,
        tools=[BioTools(client=client), CalculatorTools()],
        instructions=get_prompt(),
        num_history_runs=AGENT_HISTORY_RUNS,
        enable_user_memories=True,
        memory_manager=memory_manager,
    )
    
    return model, team

Team Leader Pattern

The Team Leader is the orchestrator that:
  • Receives the user’s request
  • Decides which sub-agents to delegate to
  • Coordinates multi-agent workflows
  • Synthesizes responses
  • Manages conversation history and user memories

Leader Configuration

team = Team(
    name="Hero Team",
    model=model,  # Main model (GPT-5, Claude, etc.)
    db=db,  # AsyncPostgresDb for session storage
    members=agents,  # List of specialized agents
    tools=[BioTools(client=client), CalculatorTools()],
    instructions=get_prompt(),  # System prompt from Phoenix
    num_history_runs=AGENT_HISTORY_RUNS,
    add_datetime_to_context=True,
    timezone_identifier="Asia/Kolkata",
    markdown=True,
    retries=AGENT_RETRIES,
    debug_mode=DEBUG_MODE,
    debug_level=DEBUG_LEVEL,
    enable_user_memories=True,
    memory_manager=memory_manager,  # Groq model for memory processing
)
From agent_factory.py:310-327

System Prompt Management

The team leader’s instructions come from Phoenix with fallback:
def get_prompt() -> str:
    """Return system prompt content pulled from Phoenix or fallback."""
    prompt_name = "herocomp"
    
    try:
        fetched = client.prompts.get(prompt_identifier=prompt_name, tag="production")
        if hasattr(fetched, "format"):
            formatted = fetched.format()
        else:
            formatted = fetched
    except Exception as e:
        print("Phoenix prompt fetch error:", e)
        return get_system_prompt()  # Fallback
    
    messages = getattr(formatted, "messages", None)
    if not messages:
        return get_system_prompt()
    
    content = messages[0].get("content")
    return content or get_system_prompt()
From agent_factory.py:126-147

Memory System

The team uses a dedicated memory model for fast memory processing:
# Groq for fast memory processing
memory_model = OpenAILike(
    id="openai/gpt-oss-120b",
    base_url="https://api.groq.com/openai/v1",
    api_key=GROQ_API_KEY,
)

memory_manager = MemoryManager(
    model=memory_model,
    db=db,  # Same AsyncPostgresDb
)
From agent_factory.py:153-161 Features:
  • User-specific memories stored in PostgreSQL
  • Fast memory updates with Groq’s 120B model
  • Shared across all conversations for a user
  • Enables personalization and context retention

Team Cache (LRU)

Teams are cached per-user with Least Recently Used eviction:
from collections import OrderedDict

_user_teams = OrderedDict()  # LRU cache
_user_team_locks: Dict[str, asyncio.Lock] = {}  # Per-user locks
_user_team_locks_guard = asyncio.Lock()  # Lock creation lock
_user_teams_lock = asyncio.Lock()  # Cache access lock
From agent_factory.py:335-339

Cache Operations

Get or Create Team:
async def get_or_create_team(user_id: str, client=None):
    user_lock = await _get_user_team_lock(user_id)
    async with user_lock:
        async with _user_teams_lock:
            if user_id in _user_teams:
                # Move to end (mark as recently used)
                _user_teams.move_to_end(user_id)
                return _user_teams[user_id]
            
            # If cache full, evict oldest team
            if len(_user_teams) >= MAX_AGENTS:
                oldest_user, oldest_team = _user_teams.popitem(last=False)
        
        # Cleanup evicted team resources
        if oldest_team is not None:
            logger.info(f"[TeamCache] Evicting team for user {oldest_user}")
            await _remove_user_team_lock(oldest_user)
            # ... cleanup MCP connections ...
        
        # Create new team
        _, team = create_team_for_user(user_id, client=client)
        
        async with _user_teams_lock:
            _user_teams[user_id] = team
            cache_size = len(_user_teams)
        
        logger.info(f"[TeamCache] Created new team for user {user_id} (cache size: {cache_size}/{MAX_AGENTS})")
        return team
From agent_factory.py:358-424

Why LRU Cache?

  1. Performance - Avoid recreating teams for active users
  2. Statefulness - Teams maintain conversation history
  3. Resource Management - Limit memory usage with MAX_AGENTS
  4. Fair Eviction - Inactive users evicted first

Resource Cleanup

When a team is evicted, all resources are cleaned up:
try:
    # Cleanup MCP connections if any
    if hasattr(oldest_team, 'members'):
        for member in oldest_team.members:
            if hasattr(member, 'tools'):
                for tool in member.tools:
                    if hasattr(tool, 'close'):
                        if hasattr(tool.close, '__await__'):
                            await tool.close()
                        else:
                            tool.close()
except Exception as e:
    logger.error(f"[TeamCache] Error during team cleanup: {e}", exc_info=True)
From agent_factory.py:385-408

Session Management

Each Discord channel gets its own session:
user_id = str(message.author.id)
session_id = str(message.channel.id)  # Channel = session

reply = await async_ask_junkie(
    prompt, user_id=user_id, session_id=session_id, images=images
)
From chat_handler.py:163-177 Session Storage:
  • Stored in PostgreSQL table agent_sessions
  • Maintains conversation flow per channel
  • Enables multi-channel support per user

Database Configuration

Teams use async PostgreSQL for better throughput:
if POSTGRES_URL:
    async_db_url = convert_to_async_url(POSTGRES_URL)
    db = AsyncPostgresDb(
        db_url=async_db_url,
        session_table="agent_sessions",  # Session/history storage
        memory_table="user_memories",    # User memory storage
    )
    logger.info("[DB] Using AsyncPostgresDb for session & memory storage")
else:
    db = None
    logger.warning("[DB] No POSTGRES_URL configured - sessions and memories will not persist!")
From agent_factory.py:87-97

URL Conversion

PostgreSQL URLs are converted to async format:
def convert_to_async_url(db_url: str) -> str:
    """Convert a postgresql:// URL to postgresql+asyncpg:// format."""
    if "+asyncpg" in db_url or "+psycopg_async" in db_url:
        return db_url
    
    try:
        parsed = make_url(db_url)
        async_url = f"postgresql+asyncpg://{parsed.username}:{parsed.password}@{parsed.host}"
        if parsed.port:
            async_url += f":{parsed.port}"
        async_url += f"/{parsed.database}"
        return async_url
    except Exception as e:
        logger.warning(f"Failed to convert URL: {e}")
        return db_url
From agent_factory.py:60-81

Configuration Variables

  • MAX_AGENTS - Maximum teams in cache (default: varies by deployment)
  • AGENT_HISTORY_RUNS - Number of previous runs to include in context
  • AGENT_RETRIES - Max retries for failed agent calls
  • DEBUG_MODE - Enable debug logging
  • DEBUG_LEVEL - Debug verbosity level

Build docs developers (and LLMs) love