Skip to main content

Overview

The agent has access to four specialized tools for querying the incident knowledge base. Each tool is optimized for different query patterns, improving LLM tool selection accuracy and search performance.

Available Tools

All tools are exported from src/copilot/tools/__init__.py:14:
available_tools = [
    lookup_incident_by_id,
    search_similar_incidents,
    get_incidents_by_application,
    get_recent_incidents,
]

1. lookup_incident_by_id

Direct lookup of a specific incident by its unique identifier. When to use: User mentions a specific incident ID (e.g., “INC-2025-08-24-001”) Parameters:
  • incident_id (str): The incident ID to look up (e.g., "INC-2025-08-24-001")
Returns: Detailed incident report or message if not found Implementation (src/copilot/tools/incident_tools.py:89):
@tool
def lookup_incident_by_id(incident_id: str) -> str:
    """Fetch a specific incident by its ID (e.g., INC-2025-08-24-001).
    
    Use this tool when the user mentions a specific incident ID.
    This performs a fast direct lookup without semantic search.
    """
    writer = _get_safe_stream_writer()
    writer({"status": f"Searching for {incident_id}..."})
    
    # Normalize the incident ID (handle unicode hyphens)
    normalized_id = incident_id.replace("‑", "-").strip()
    
    try:
        qdrant_filter = Filter(
            must=[
                FieldCondition(
                    key="metadata.incident_id",
                    match=MatchValue(value=normalized_id),
                )
            ]
        )
        
        docs = _scroll_qdrant_with_filter(qdrant_filter)
        
        if docs:
            incident_ids = set()
            for doc in docs:
                inc_id = _get_metadata_value(doc.metadata, "incident_id")
                if inc_id:
                    incident_ids.add(inc_id)
            writer({"status": f"Found incident {', '.join(incident_ids)}..."})
        else:
            writer({"status": f"No incident found with ID {normalized_id}"})
        
        return format_incidents_response(docs)
        
    except Exception as e:
        logger.error(f"Error in lookup_incident_by_id: {e}")
        return (
            f"An error occurred while looking up incident {incident_id}. "
            "Please try again or contact support if the issue persists."
        )
Example usage:
result = lookup_incident_by_id("INC-2025-08-24-001")
Key features:
  • Fast metadata-based lookup (no vector search)
  • Handles unicode hyphen variations (U+2011 → U+002D)
  • Returns all document chunks for the incident

2. search_similar_incidents

Semantic search across all incident reports based on problem descriptions. When to use: User describes a problem, error, or symptom without a specific incident ID Parameters:
  • query (str): Problem description, error message, or search query
  • limit (int, optional): Maximum number of incidents to return (default: 5)
Returns: Matching incident reports or message if none found Implementation (src/copilot/tools/incident_tools.py:140):
@tool
def search_similar_incidents(query: str, limit: int = 5) -> str:
    """Search for incidents similar to a description or error message.
    
    Use this tool when the user describes a problem, error, or symptom
    without mentioning a specific incident ID. This performs semantic
    similarity search across all incident reports.
    """
    writer = _get_safe_stream_writer()
    writer({"status": "Searching for Similar Incidents..."})
    
    retriever = _get_retriever()
    vector_store = _get_vector_store()
    
    if retriever is None and vector_store is None:
        logger.error("Knowledge base retriever is not initialized")
        return "The knowledge base is currently unavailable. Please try again later."
    
    try:
        normalized_query = query.replace("‑", "-")
        docs = []
        
        # Try SelfQueryRetriever first (for precision with metadata filters)
        if retriever is not None:
            try:
                docs = retriever.invoke(
                    input=normalized_query,
                    config={"stream": False},
                )
            except Exception as retriever_error:
                logger.warning(f"SelfQueryRetriever failed, falling back to vector search: {retriever_error}")
        
        # Fallback to pure vector search if SelfQueryRetriever returns empty
        if not docs and vector_store is not None:
            logger.info(f"SelfQueryRetriever returned empty, trying pure vector search for: {normalized_query}")
            writer({"status": "Expanding search..."})
            docs = vector_store.similarity_search(
                query=normalized_query,
                k=limit * 2,  # Get more results to dedupe
            )
        
        # Limit results
        docs = docs[:limit] if len(docs) > limit else docs
        
        if docs:
            incident_ids = set()
            for doc in docs:
                inc_id = _get_metadata_value(doc.metadata, "incident_id")
                if inc_id:
                    incident_ids.add(inc_id)
            writer({"status": f"Found {len(incident_ids)} relevant incidents..."})
        else:
            writer({"status": "No similar incidents found"})
        
        return format_incidents_response(docs)
        
    except Exception as e:
        logger.error(f"Error in search_similar_incidents: {e}")
        return (
            "An error occurred while searching for incidents. "
            "Please try rephrasing your query or contact support if the issue persists."
        )
Example usage:
result = search_similar_incidents("HTTP 403 forbidden errors in PayU", limit=3)
Search strategy:
  1. Primary: SelfQueryRetriever with metadata filtering
  2. Fallback: Pure vector similarity search
  3. Deduplication: Results limited by unique incident IDs

3. get_incidents_by_application

Find incidents affecting a specific application or system. When to use: User asks about incidents for a particular application, service, or system Parameters:
  • app_name (str): The application or system name (e.g., "PayU Core", "Settlement & Reporting")
  • limit (int, optional): Maximum number of incidents to return (default: 5)
Returns: Incident reports for the application or message if none found Implementation (src/copilot/tools/incident_tools.py:213):
@tool
def get_incidents_by_application(app_name: str, limit: int = 5) -> str:
    """Find incidents affecting a specific application or system.
    
    Use this tool when the user asks about incidents for a particular
    application, service, or system (e.g., 'PayU Core', 'Settlement & Reporting').
    """
    writer = _get_safe_stream_writer()
    writer({"status": f"Searching incidents for {app_name}..."})
    
    try:
        # Use text matching for application name (case-insensitive via Qdrant's match)
        qdrant_filter = Filter(
            must=[
                FieldCondition(
                    key="metadata.impacted_application",
                    match=MatchValue(value=app_name),
                )
            ]
        )
        
        docs = _scroll_qdrant_with_filter(qdrant_filter, limit=limit * 10)
        
        # Deduplicate by incident_id and limit
        seen_incidents = set()
        unique_docs = []
        for doc in docs:
            inc_id = _get_metadata_value(doc.metadata, "incident_id")
            if inc_id and inc_id not in seen_incidents:
                seen_incidents.add(inc_id)
                unique_docs.append(doc)
                if len(unique_docs) >= limit:
                    break
        
        if unique_docs:
            writer({"status": f"Found {len(unique_docs)} incidents for {app_name}..."})
        else:
            # Try a partial match using semantic search as fallback
            writer({"status": f"Trying broader search for {app_name}..."})
            retriever = _get_retriever()
            if retriever:
                broader_query = f"incidents affecting {app_name}"
                docs = retriever.invoke(
                    input=broader_query,
                    config={"stream": False},
                )
                unique_docs = docs[:limit] if len(docs) > limit else docs
        
        return format_incidents_response(unique_docs)
        
    except Exception as e:
        logger.error(f"Error in get_incidents_by_application: {e}")
        return (
            f"An error occurred while searching for incidents affecting {app_name}. "
            "Please try again or contact support if the issue persists."
        )
Example usage:
result = get_incidents_by_application("PayU Core Payments", limit=5)
Search strategy:
  1. Primary: Exact match on metadata.impacted_application field
  2. Fallback: Semantic search with query expansion

4. get_recent_incidents

Retrieve incidents from the last N days. When to use: User asks about recent incidents or incidents within a timeframe Parameters:
  • days (int, optional): Number of days to look back (default: 7)
  • limit (int, optional): Maximum number of incidents to return (default: 10)
Returns: Recent incident reports or message if none found Implementation (src/copilot/tools/incident_tools.py:286):
@tool
def get_recent_incidents(days: int = 7, limit: int = 10) -> str:
    """Get incidents from the last N days.
    
    Use this tool when the user asks about recent incidents or
    incidents within a specific timeframe (e.g., 'last week', 'past month').
    """
    writer = _get_safe_stream_writer()
    writer({"status": f"Searching incidents from the last {days} days..."})
    
    try:
        # Calculate the cutoff date
        cutoff_date = datetime.now() - timedelta(days=days)
        cutoff_str = cutoff_date.strftime("%Y-%m-%d")
        
        # Get all incidents and filter by date parsed from incident_id
        # Incident ID format: INC-YYYY-MM-DD-NNN
        vector_store = _get_vector_store()
        all_docs: List[Document] = []
        next_page = None
        seen_points = set()
        
        # Scroll through all incidents (no filter - we'll filter by date in Python)
        while len(all_docs) < 1000:  # Safety limit
            points, next_page = vector_store.client.scroll(
                collection_name=vector_store.collection_name,
                with_payload=True,
                with_vectors=False,
                limit=64,
                offset=next_page,
            )
            
            if not points:
                break
            
            for point in points:
                if point.id in seen_points:
                    continue
                seen_points.add(point.id)
                
                payload = point.payload or {}
                metadata = payload.get("metadata", {})
                page_content = payload.get("page_content", "")
                all_docs.append(
                    Document(
                        page_content=page_content,
                        metadata=metadata,
                    )
                )
            
            if next_page is None:
                break
        
        # Filter by date from incident_id (format: INC-YYYY-MM-DD-NNN)
        date_pattern = r"INC-(\d{4}-\d{2}-\d{2})-(\d+)"
        recent_docs = []
        seen_incidents = set()
        
        for doc in all_docs:
            inc_id = _get_metadata_value(doc.metadata, "incident_id")
            if not inc_id or inc_id in seen_incidents:
                continue
            
            match = re.match(date_pattern, inc_id)
            if match:
                incident_date_str = match.group(1)
                try:
                    incident_date = datetime.strptime(incident_date_str, "%Y-%m-%d")
                    if incident_date >= cutoff_date:
                        seen_incidents.add(inc_id)
                        recent_docs.append(doc)
                        if len(recent_docs) >= limit:
                            break
                except ValueError:
                    continue
        
        # Sort by date (most recent first)
        recent_docs.sort(key=lambda doc: get_incident_date(doc), reverse=True)
        
        if recent_docs:
            incident_ids = set()
            for doc in recent_docs:
                inc_id = _get_metadata_value(doc.metadata, "incident_id")
                if inc_id:
                    incident_ids.add(inc_id)
            writer({"status": f"Found {len(incident_ids)} incidents from the last {days} days..."})
        else:
            writer({"status": f"No incidents found in the last {days} days"})
        
        return format_incidents_response(recent_docs)
        
    except Exception as e:
        logger.error(f"Error in get_recent_incidents: {e}")
        return (
            f"An error occurred while searching for recent incidents. "
            "Please try again or contact support if the issue persists."
        )
Example usage:
# Last 7 days (default)
result = get_recent_incidents()

# Last 30 days
result = get_recent_incidents(days=30, limit=20)
Date extraction: The tool parses dates from incident IDs using the pattern INC-YYYY-MM-DD-NNN. Incidents are sorted by date (most recent first).

Shared Utilities

All tools share common utilities from src/copilot/tools/_base.py:

Vector Store Components

def _get_vector_store() -> QdrantVectorStore:
    """Get or create the vector store."""
    global _vector_store
    if _vector_store is None:
        _vector_store = QdrantVectorStore(
            client=_get_qdrant_client(),
            collection_name=_get_active_collection_name(),
            embedding=_get_embeddings(),
        )
    return _vector_store
Components:
  • Embeddings: all-MiniLM-L6-v2 (HuggingFace)
  • Vector DB: Qdrant with optional API key authentication
  • Collection: Dynamic resolution from incident_dataset_versions table

Metadata Fields

Tools can filter and search across these metadata fields:
METADATA_FIELD_INFO = [
    AttributeInfo(
        name="incident_id",
        description="The unique identifier for an incident, e.g., 'INC-2025-08-24-001'",
        type="string",
    ),
    AttributeInfo(
        name="incident_title",
        description="The high-level title of the incident",
        type="string",
    ),
    AttributeInfo(
        name="impacted_application",
        description="The name of the software or system that was impacted",
        type="string",
    ),
    AttributeInfo(
        name="root_cause",
        description="A summary of the root cause of the incident",
        type="string",
    ),
    AttributeInfo(
        name="mitigation",
        description="The steps taken to resolve or mitigate the incident",
        type="string",
    ),
    AttributeInfo(
        name="accountable_party",
        description="The team or entity responsible for the incident",
        type="string",
    ),
    AttributeInfo(
        name="source_system",
        description="The system that reported the incident",
        type="string",
    ),
    AttributeInfo(
        name="repeat_incident",
        description="Boolean indicating if this was a repeat incident",
        type="string",
    ),
]

Response Formatting

Incident data is formatted consistently across all tools:
def format_incident_context(doc: Document) -> str:
    """Format a single incident document into readable context."""
    # Extract metadata
    incident_id = _get_metadata_value(metadata, "incident_id")
    root_cause = _get_metadata_value(metadata, "root_cause")
    mitigation = _get_metadata_value(metadata, "mitigation")
    impacted_application = _get_metadata_value(metadata, "impacted_application")
    
    # Build context block
    context_lines = ["---"]
    if incident_id:
        context_lines.append(f"Incident ID: {incident_id}")
    if incident_title:
        context_lines.append(f"Title: {incident_title}")
    if root_cause:
        context_lines.append(f"Root Cause: {root_cause}")
    if mitigation:
        context_lines.append(f"Mitigation: {mitigation}")
    # ... more fields
    
    context_lines.append("")
    context_lines.append("Details and Actions Taken and Steps and Fixes:")
    context_lines.append(page_content)
    context_lines.append("---")
    
    return "\n".join(context_lines)

Stream Writer

All tools use a safe stream writer to provide status updates:
def _get_safe_stream_writer() -> Callable:
    """Get stream writer if available, otherwise return a no-op function.
    
    This allows tools to work both within LangGraph context (with streaming)
    and outside of it
    """
    try:
        writer = get_stream_writer()
        return writer
    except Exception:
        return lambda x: None
Status updates:
  • "Searching for {incident_id}..."
  • "Found {count} relevant incidents..."
  • "Expanding search..."
  • "No similar incidents found"

Build docs developers (and LLMs) love