Skip to main content

Overview

Chronos-DFIR’s Case Management System (Etapa 2 roadmap) provides a structured workflow for multi-file investigations. Cases organize evidence files, analysis results, journal entries, IOCs, and narrative documentation in a single DuckDB database.
Current Status: Case management backend is implemented (engine/case_db.py + engine/case_router.py) but not yet activated in the main UI. This guide documents the architecture and API for integration testing and future UI development.

Case Management Architecture

Database Schema

All case data is stored in a single DuckDB file (chronos_cases.duckdb) with 7 tables:
-- Core case metadata
CREATE TABLE cases (
    case_id     UUID PRIMARY KEY,
    name        VARCHAR NOT NULL,
    created_at  TIMESTAMP,
    updated_at  TIMESTAMP,
    status      VARCHAR DEFAULT 'open',
    description VARCHAR,
    investigator VARCHAR
);

-- Investigation phases within a case
CREATE TABLE phases (
    phase_id     UUID PRIMARY KEY,
    case_id      UUID REFERENCES cases(case_id),
    phase_number INTEGER NOT NULL,
    title        VARCHAR NOT NULL,
    objective    VARCHAR,
    notes        VARCHAR,
    status       VARCHAR DEFAULT 'active'
);

-- Files uploaded to a case
CREATE TABLE case_files (
    file_id            UUID PRIMARY KEY,
    case_id            UUID REFERENCES cases(case_id),
    phase_id           UUID REFERENCES phases(phase_id),
    original_filename  VARCHAR NOT NULL,
    processed_filename VARCHAR NOT NULL,
    sha256             VARCHAR,
    file_size          BIGINT,
    file_category      VARCHAR,
    row_count          INTEGER,
    time_range_start   VARCHAR,
    time_range_end     VARCHAR,
    technology         VARCHAR,
    uploaded_at        TIMESTAMP
);

-- Analysis results (forensic reports, Sigma hits, YARA)
CREATE TABLE analysis_results (
    result_id       UUID PRIMARY KEY,
    file_id         UUID REFERENCES case_files(file_id),
    case_id         UUID REFERENCES cases(case_id),
    phase_id        UUID REFERENCES phases(phase_id),
    analysis_type   VARCHAR DEFAULT 'forensic_report',
    result_json     VARCHAR,  -- Serialized forensic report
    risk_score      INTEGER,
    sigma_hit_count INTEGER
);

-- Investigation journal (notes, findings, AI insights)
CREATE TABLE journal_entries (
    entry_id   UUID PRIMARY KEY,
    case_id    UUID REFERENCES cases(case_id),
    phase_id   UUID REFERENCES phases(phase_id),
    entry_type VARCHAR DEFAULT 'note',
    content    VARCHAR NOT NULL,
    author     VARCHAR,
    created_at TIMESTAMP
);

-- Indicators of Compromise
CREATE TABLE case_iocs (
    ioc_id          UUID PRIMARY KEY,
    case_id         UUID REFERENCES cases(case_id),
    phase_id        UUID REFERENCES phases(phase_id),
    file_id         UUID REFERENCES case_files(file_id),
    ioc_type        VARCHAR NOT NULL,  -- ip, domain, hash, email, etc.
    ioc_value       VARCHAR NOT NULL,
    first_seen      VARCHAR,
    last_seen       VARCHAR,
    context         VARCHAR,
    enrichment_json VARCHAR  -- OSINT enrichment data
);

-- Case narrative (executive summary)
CREATE TABLE case_narrative (
    narrative_id   UUID PRIMARY KEY,
    case_id        UUID REFERENCES cases(case_id),
    content_md     VARCHAR,  -- Markdown content
    generated_at   TIMESTAMP,
    last_edited_at TIMESTAMP,
    version        INTEGER DEFAULT 1
);

Design Principles

No external dependencies: DuckDB is an embedded database (no server process)Portable: Single .duckdb file contains entire investigation databaseBackup-friendly: Copy chronos_cases.duckdb to backup or share with team
Write lock (threading.Lock()) ensures concurrent write operations don’t corrupt databaseConcurrent reads OK: Multiple analysis processes can read case data simultaneouslyImplementation (engine/case_db.py line 25):
_write_lock = threading.Lock()

def create_case(name: str, description: str = ""):
    conn = get_conn()
    with _write_lock:
        result = conn.execute(
            "INSERT INTO cases (name, description) VALUES (?, ?) RETURNING case_id",
            [name, description]
        ).fetchone()
    return result[0]
Without case open: Interface behaves identically to current single-file modeWith case open: Additional UI elements (sidebar, journal, phase selector) become visibleFile uploads: Can still upload standalone files (stored as single-file “cases” with auto-generated names)

Case Workflow

Creating a Case

1

Define Case Metadata

API Endpoint: POST /api/casesRequest:
{
  "name": "Incident-2024-03-08-Ransomware",
  "description": "QILIN ransomware investigation - Finance department",
  "investigator": "John Doe"
}
Response:
{
  "case_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "name": "Incident-2024-03-08-Ransomware",
  "status": "open"
}
Implementation (engine/case_router.py line 68):
@case_router.post("")
async def create_case(request: CreateCaseRequest):
    case_id = case_db.create_case(
        name=request.name,
        description=request.description,
        investigator=request.investigator
    )
    return {"case_id": case_id, "name": request.name, "status": "open"}
2

Create Investigation Phases

API Endpoint: POST /api/cases/{case_id}/phasesExample phases:
  1. Initial Triage — Quick timeline review, IOC extraction
  2. Malware Analysis — Reverse engineering, YARA scanning
  3. Lateral Movement Tracing — Network logs, authentication events
  4. Impact Assessment — File encryption scope, data exfiltration
  5. Remediation — IOC blocking, system restoration
Request:
{
  "title": "Initial Triage",
  "objective": "Identify initial access vector and timeline"
}
Response:
{
  "phase_id": "b2c3d4e5-f6a7-8901-bcde-f12345678901",
  "title": "Initial Triage",
  "case_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
}
3

Upload Evidence Files

Current upload flow modified to accept case_id and phase_id parameters.Enhanced endpoint: POST /upload?case_id=xxx&phase_id=yyyProcessing:
  1. File ingested normally (parsed to CSV in upload/ directory)
  2. Chain of custody recorded:
    • SHA256 hash computed during streaming upload
    • File metadata saved to case_files table
  3. Automatic analysis:
    • Sigma/YARA rules evaluated
    • Forensic report generated
    • Results saved to analysis_results table
File registration (engine/case_db.py line 298):
def register_file(
    case_id: str,
    original_filename: str,
    processed_filename: str,
    sha256: str = "",
    file_size: int = 0,
    file_category: str = "generic",
    row_count: int = 0,
    time_range_start: str = "",
    time_range_end: str = "",
    phase_id: Optional[str] = None
) -> str:
    # Returns file_id (UUID)

Multi-File Timeline

Goal: Combine events from multiple files into a unified timeline.
1

Query Case Files

API Endpoint: GET /api/cases/{case_id}/filesResponse:
{
  "files": [
    {
      "file_id": "f1a2b3c4-...",
      "original_filename": "Security.evtx",
      "processed_filename": "Security_20240308_142345.csv",
      "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
      "file_category": "Windows/EVTX",
      "row_count": 45812,
      "time_range_start": "2024-03-01 08:00:00",
      "time_range_end": "2024-03-08 18:45:00",
      "phase_title": "Initial Triage",
      "phase_number": 1
    },
    {
      "file_id": "f2b3c4d5-...",
      "original_filename": "Sysmon.evtx",
      "processed_filename": "Sysmon_20240308_142512.csv",
      "file_category": "Windows/EVTX",
      "row_count": 128943
    }
  ],
  "total": 2
}
2

Load and Merge DataFrames

Backend implementation (Etapa 4 feature):
import polars as pl

def load_case_timeline(case_id: str) -> pl.DataFrame:
    files = case_db.get_case_files(case_id)
    dfs = []
    
    for file in files:
        df = pl.read_csv(file["processed_filename"])
        # Add provenance columns
        df = df.with_columns([
            pl.lit(file["file_id"]).alias("_source_file_id"),
            pl.lit(file["original_filename"]).alias("_source_filename"),
            pl.lit(file["file_category"]).alias("_source_category")
        ])
        dfs.append(df)
    
    # Concatenate all files
    df_merged = pl.concat(dfs, how="diagonal")  # Union with schema reconciliation
    
    # Sort by timestamp
    time_col = get_primary_time_column(df_merged.columns)
    if time_col:
        df_merged = df_merged.sort(time_col)
    
    return df_merged
3

Cross-File Correlation

Correlation strategies:1. Shared IP addresses:
# Find IPs appearing in multiple files
ip_cols = ["SourceIP", "DestinationIP", "IpAddress", "ClientIP"]

for col in ip_cols:
    if col in df_merged.columns:
        cross_file_ips = df_merged.group_by([col, "_source_file_id"]).agg(
            pl.count().alias("event_count")
        ).filter(
            pl.col("event_count") > 0
        ).group_by(col).agg(
            pl.n_unique("_source_file_id").alias("file_count")
        ).filter(pl.col("file_count") > 1)
2. User account tracking:
# Trace user activity across files
user_timeline = df_merged.filter(
    pl.col("User").eq("DOMAIN\\compromised_user")
).select(["Time", "EventID", "Computer", "_source_filename"])
3. Process tree reconstruction:
# Link parent/child processes across Sysmon files
processes = df_merged.filter(
    pl.col("EventID").is_in(["1", "4688"])  # Process creation
).select(["ProcessGuid", "ParentProcessGuid", "Image", "CommandLine"])

Journal & Notes

Adding Journal Entries

API Endpoint: POST /api/cases/{case_id}/journal Entry Types:
  • note — General observation or reminder
  • finding — Important discovery (e.g., “Identified initial access via phishing email”)
  • hypothesis — Investigative theory to test
  • ai_insight — AI-generated analysis or suggestion (future feature)
Example Request:
{
  "content": "PowerShell encoded command executed by SYSTEM at 14:23:45. Likely scheduled task persistence. Next: check Event ID 4698.",
  "entry_type": "finding",
  "author": "analyst1",
  "phase_id": "b2c3d4e5-f6a7-8901-bcde-f12345678901"
}
Response:
{
  "entry_id": "j1k2l3m4-n5o6-7890-pqrs-tuvwxyz12345",
  "case_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
}

Retrieving Journal

API Endpoint: GET /api/cases/{case_id}/journal Response:
{
  "entries": [
    {
      "entry_id": "j1k2l3m4-...",
      "phase_id": "b2c3d4e5-...",
      "entry_type": "note",
      "content": "Started analysis of Security.evtx - 45K events over 7 days",
      "author": "analyst1",
      "created_at": "2024-03-08 14:30:00"
    },
    {
      "entry_id": "j2k3l4m5-...",
      "entry_type": "finding",
      "content": "PowerShell encoded command executed by SYSTEM...",
      "author": "analyst1",
      "created_at": "2024-03-08 14:45:12"
    }
  ],
  "total": 2
}

Journal UI (Roadmap)

Planned features (Etapa 3):
1

Timeline-Integrated Notes

Click any row in the timeline grid → “Add Note” context menuResult: Journal entry linked to specific event (via timestamp + row ID)
2

Phase-Specific Journal View

Sidebar shows journal entries filtered to current phaseNavigation: Click phase → view only notes from that investigation stage
3

Markdown Support

Journal content supports:
  • Headers, lists, code blocks
  • Links to external resources
  • Embedded screenshots (Base64-encoded)
4

AI-Assisted Journaling

Future feature (Etapa 5 - MCP Server integration):
  • AI suggests journal entries based on detected patterns
  • Auto-summarize investigation progress
  • Generate narrative sections from journal entries

IOC Management

Upserting IOCs

API Endpoint: POST /api/cases/{case_id}/iocs (custom endpoint - not yet in case_router.py) Upsert logic (engine/case_db.py line 455):
  • If IOC already exists (same ioc_type + ioc_value), update last_seen and context
  • Otherwise, insert new IOC
Example:
ioc_id = case_db.upsert_ioc(
    case_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    ioc_type="ip",
    ioc_value="192.168.1.50",
    first_seen="2024-03-08 14:23:00",
    last_seen="2024-03-08 16:45:00",
    context="C2 server - 47 connections from compromised host",
    phase_id="b2c3d4e5-f6a7-8901-bcde-f12345678901"
)

Retrieving IOCs

API Endpoint: GET /api/cases/{case_id}/iocs Response:
{
  "iocs": [
    {
      "ioc_id": "i1o2c3k4-...",
      "ioc_type": "ip",
      "ioc_value": "192.168.1.50",
      "first_seen": "2024-03-08 14:23:00",
      "last_seen": "2024-03-08 16:45:00",
      "context": "C2 server - 47 connections",
      "phase_id": "b2c3d4e5-..."
    },
    {
      "ioc_type": "hash",
      "ioc_value": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
      "context": "Ransomware payload SHA256",
      "enrichment_json": "{\"virustotal\": {\"positives\": 42, \"total\": 70}}"
    }
  ],
  "total": 2,
  "by_type": {
    "ip": 1,
    "hash": 1
  }
}

IOC Types

Supported types:
  • ip — IPv4/IPv6 addresses
  • domain — DNS domains and subdomains
  • hash — MD5, SHA1, SHA256, SHA512
  • email — Email addresses
  • url — Full URLs
  • filename — Malicious filenames
  • registry — Registry key paths
  • mutex — Malware mutex names
  • user — Compromised user accounts
  • certificate — SSL certificate thumbprints

Cross-File IOC Correlation

Use case: IP address appears in multiple evidence files across different phases. Query:
# Get all files where IOC appears
ioc_value = "192.168.1.50"
files = case_db.get_case_files(case_id)

for file in files:
    df = pl.read_csv(file["processed_filename"])
    ip_cols = ["SourceIP", "DestinationIP", "IpAddress"]
    
    for col in ip_cols:
        if col in df.columns:
            matches = df.filter(pl.col(col).eq(ioc_value))
            if matches.height > 0:
                print(f"IOC found in {file['original_filename']}: {matches.height} events")
Future feature (Etapa 4): Automatic IOC extraction and cross-file tracking.

Case Narrative

Auto-Generated Narrative

Goal: Produce an executive summary of the investigation in Markdown format. API Endpoint: PUT /api/cases/{case_id}/narrative Request:
{
  "content_md": "# Incident Report: QILIN Ransomware\n\n## Executive Summary\n\nOn March 8, 2024, the Finance department experienced a ransomware attack...",
  "version": 1
}
Response:
{
  "narrative_id": "n1a2r3r4-...",
  "case_id": "a1b2c3d4-...",
  "status": "saved"
}
# Incident Report: [Case Name]

## Executive Summary
[2-3 paragraphs: What happened, impact, resolution]

## Timeline of Events
| Timestamp | Event | Source |
|-----------|-------|--------|
| 2024-03-08 14:23:45 | Initial access via phishing email | Email logs |
| 2024-03-08 14:25:12 | PowerShell encoded execution | Security.evtx |
| 2024-03-08 14:30:00 | Lateral movement to DC | Sysmon.evtx |

## Attack Vector
[Detailed analysis of initial access]

## Indicators of Compromise
- **IPs**: 192.168.1.50 (C2 server)
- **Hashes**: e3b0c442... (ransomware payload)
- **Domains**: alpha-c2.example.com

## MITRE ATT&CK Mapping
- **TA0001** Initial Access: T1566 (Phishing)
- **TA0002** Execution: T1059.001 (PowerShell)
- **TA0008** Lateral Movement: T1021.002 (SMB/Windows Admin Shares)

## Remediation Actions
1. Isolated affected hosts (10.0.5.0/24 subnet)
2. Blocked C2 IP at firewall
3. Reset credentials for compromised accounts

## Lessons Learned
[Recommendations to prevent recurrence]

AI-Generated Narrative (Roadmap)

Etapa 6 feature: Automatic narrative generation from:
  • Journal entries
  • Sigma/YARA detections
  • IOC lists
  • Timeline statistics
Planned implementation:
  1. Aggregate all case data (files, journal, IOCs, analysis results)
  2. Generate structured prompt for LLM (via MCP Server)
  3. LLM produces Markdown narrative
  4. Analyst reviews and edits
  5. Save to case_narrative table

Case Export

Export Format: .chronos-case

Roadmap feature (Etapa 6): Bundle entire investigation as portable archive. Contents:
case_a1b2c3d4.chronos-case/
├── case.json               # Case metadata
├── phases.json             # Phase definitions
├── journal.json            # Journal entries
├── iocs.json               # IOC list with enrichment
├── narrative.md            # Case narrative
├── files/
│   ├── Security.evtx.csv   # Processed evidence
│   ├── Sysmon.evtx.csv
│   └── hashes.txt          # SHA256 checksums
├── analysis/
│   ├── forensic_report_1.json
│   └── forensic_report_2.json
└── exports/
    ├── timeline_filtered.xlsx
    └── context_export.json
Use cases:
  • Share investigation with external team (law enforcement, IR vendor)
  • Archive completed cases for compliance
  • Load case into another Chronos-DFIR instance

API Reference Summary

Case CRUD

EndpointMethodDescription
/api/casesPOSTCreate new case
/api/casesGETList all cases
/api/cases/{case_id}GETGet case details + phases
/api/cases/{case_id}PUTUpdate case metadata
/api/cases/{case_id}DELETEArchive case (soft delete)

Phases

EndpointMethodDescription
/api/cases/{case_id}/phasesPOSTCreate phase
/api/cases/{case_id}/phasesGETList phases for case
/api/cases/{case_id}/phases/{phase_id}PUTUpdate phase

Files

EndpointMethodDescription
/api/cases/{case_id}/filesGETList files in case
/api/cases/{case_id}/files?phase_id={id}GETList files in specific phase

Journal

EndpointMethodDescription
/api/cases/{case_id}/journalPOSTAdd journal entry
/api/cases/{case_id}/journalGETGet journal entries
/api/cases/{case_id}/journal?phase_id={id}GETGet entries for phase

IOCs

EndpointMethodDescription
/api/cases/{case_id}/iocsGETGet all IOCs for case
/api/cases/{case_id}/iocs?phase_id={id}GETGet IOCs for phase

Narrative

EndpointMethodDescription
/api/cases/{case_id}/narrativeGETGet case narrative
/api/cases/{case_id}/narrativePUTSave/update narrative

Testing Case Management APIs

Using cURL

Create a case:
curl -X POST http://localhost:8000/api/cases \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Test-Case-2024-03-08",
    "description": "Testing case management",
    "investigator": "analyst1"
  }'
Create a phase:
CASE_ID="a1b2c3d4-e5f6-7890-abcd-ef1234567890"

curl -X POST http://localhost:8000/api/cases/${CASE_ID}/phases \
  -H "Content-Type: application/json" \
  -d '{
    "title": "Initial Triage",
    "objective": "Identify IOCs and timeline"
  }'
Add journal entry:
curl -X POST http://localhost:8000/api/cases/${CASE_ID}/journal \
  -H "Content-Type: application/json" \
  -d '{
    "content": "Started analysis - 45K events in Security.evtx",
    "entry_type": "note",
    "author": "analyst1"
  }'

Using Python

import requests

BASE_URL = "http://localhost:8000/api/cases"

# Create case
resp = requests.post(BASE_URL, json={
    "name": "Python-Test-Case",
    "description": "Automated test",
    "investigator": "script"
})
case_id = resp.json()["case_id"]
print(f"Created case: {case_id}")

# Create phase
resp = requests.post(f"{BASE_URL}/{case_id}/phases", json={
    "title": "Automated Analysis",
    "objective": "Run detection rules"
})
phase_id = resp.json()["phase_id"]

# Add journal entry
requests.post(f"{BASE_URL}/{case_id}/journal", json={
    "content": "Test entry from Python script",
    "entry_type": "note",
    "phase_id": phase_id
})

# Retrieve case details
case = requests.get(f"{BASE_URL}/{case_id}").json()
print(f"Case has {case['phase_count']} phases and {case['file_count']} files")

Roadmap Integration Timeline

From README.md (line 306):
EtapaStatusDescription
Etapa 2PENDINGCase DB + Router (CRUD complete, needs activation)
Etapa 3PENDINGFrontend Sidebar + Journal UI
Etapa 4PENDINGMulti-File + Cross-Correlation
Etapa 5PENDINGMCP Server + AI Chat
Etapa 6PENDINGAuto-Narrative + .chronos-case export
Next steps for activation:
  1. Install DuckDB: pip install duckdb
  2. Mount case router in app.py: app.include_router(case_router)
  3. Create integration tests (pytest + httpx)
  4. Verify CRUD operations against real database

Next Steps

Evidence Ingestion

Learn how to ingest multi-format forensic artifacts for case investigations

Detection Rules

Understand how Sigma and YARA detections integrate with case analysis

Filtering & Searching

Navigate large multi-file timelines with advanced filtering techniques

Build docs developers (and LLMs) love