Skip to main content

Open in Colab

A log processing system using Fenic’s text extraction and semantic enrichment capabilities to transform unstructured logs into actionable incident response data.

Overview

This pipeline demonstrates log enrichment through multi-stage processing:
  • Template-based parsing without regex
  • Service metadata enrichment via joins
  • LLM-powered error categorization and remediation
  • Incident severity assessment with business context

What You’ll Build

1

Parse unstructured logs

Extract structured fields from syslog-format messages using template extraction.
2

Enrich with metadata

Join with service ownership and criticality data for context.
3

Apply semantic analysis

Use LLM operations for error categorization, severity assessment, and remediation.

Prerequisites

pip install fenic

Implementation

Stage 1: Template-Based Parsing

Extract structured fields from syslog-format messages without using regex:
import fenic as fc
from pydantic import BaseModel, Field

# Configure session
config = fc.SessionConfig(
    app_name="log_enrichment",
    semantic=fc.SemanticConfig(
        language_models={
            "mini": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200_000
            )
        }
    )
)

session = fc.Session.get_or_create(config)

# Define log template
log_template = "${timestamp:none} [${level:none}] ${service:none}: ${message:none}"

# Parse logs using template extraction
parsed_df = logs_df.select(
    fc.text.extract("raw_message", log_template).alias("parsed")
).select(
    fc.col("parsed").get_item("timestamp").alias("timestamp"),
    fc.col("parsed").get_item("level").alias("level"),
    fc.col("parsed").get_item("service").alias("service"),
    fc.col("parsed").get_item("message").alias("message")
).filter(
    fc.col("timestamp").is_not_null()
)
Template extraction uses ${field:type} syntax where type can be none (string), int, float, etc.

Stage 2: Metadata Enrichment

Join with service ownership and criticality data:
# Rename service_name to service for join
metadata_df_renamed = metadata_df.select(
    fc.col("service_name").alias("service"),
    "team_owner",
    "criticality",
    "on_call_channel"
)

# Join parsed logs with metadata
enriched_df = parsed_df.join(
    metadata_df_renamed,
    on="service",
    how="left"
).select(
    "timestamp",
    "level",
    "service",
    "message",
    "team_owner",
    "criticality",
    "on_call_channel"
)

Stage 3: Semantic Enrichment

Apply LLM operations for error analysis and remediation:
# Define Pydantic model for error analysis
class ErrorAnalysis(BaseModel):
    """Pydantic model for semantic error extraction."""
    error_category: str = Field(
        description="Main category of the error (e.g., database, network, authentication, resource)"
    )
    affected_component: str = Field(
        description="Specific component or resource affected"
    )
    potential_cause: str = Field(
        description="Most likely root cause of the issue"
    )

# Apply semantic enrichment
final_df = enriched_df.select(
    "timestamp",
    "level",
    "service",
    "message",
    "team_owner",
    "criticality",
    "on_call_channel",
    # Extract error analysis using Pydantic model
    fc.semantic.extract("message", ErrorAnalysis).alias("analysis"),
    # Classify incident severity based on message and criticality
    fc.semantic.classify(
        fc.text.concat(
            fc.col("message"),
            fc.lit(" (criticality: "),
            fc.col("criticality"),
            fc.lit(")")
        ),
        ["low", "medium", "high", "critical"]
    ).alias("incident_severity"),
    # Generate remediation steps
    fc.semantic.map(
        (
            "Generate 2-3 specific remediation steps that the on-call team should take to resolve this issue: "
            "{{message}} | Service: {{service}} | Team: {{team_owner}}"
        ),
        message=fc.col("message"),
        service=fc.col("service"),
        team_owner=fc.col("team_owner")
    ).alias("remediation_steps")
)

Output Format

The pipeline produces enriched logs with actionable intelligence:
# Extract analysis fields for readability
final_readable = final_df.select(
    "timestamp",
    "level",
    "service",
    "message",
    "team_owner",
    "criticality",
    "on_call_channel",
    fc.col("analysis").get_item("error_category").alias("error_category"),
    fc.col("analysis").get_item("affected_component").alias("affected_component"),
    fc.col("analysis").get_item("potential_cause").alias("potential_cause"),
    "incident_severity",
    "remediation_steps"
)

final_readable.show()

Sample Output

timestamplevelserviceerror_categoryincident_severityremediation_steps
2024-01-15 14:32:01ERRORpayment-apidatabasecritical1. Check Database Connectivity…
2024-01-15 14:32:15WARNuser-serviceresourcecritical1. Review Rate Limiting Config…

Analytics Examples

Generate insights from enriched logs:
# Error category distribution
final_readable.group_by("error_category").agg(
    fc.count("*").alias("count")
).show()

# Severity by service criticality
final_readable.group_by("criticality", "incident_severity").agg(
    fc.count("*").alias("count")
).show()

# High-priority incidents requiring immediate attention
critical_incidents = final_readable.filter(
    (fc.col("incident_severity") == "critical") |
    (fc.col("incident_severity") == "high")
).select(
    "service",
    "team_owner",
    "incident_severity",
    "on_call_channel",
    "remediation_steps"
)

critical_incidents.show()

Configuration Options

Custom Log Templates

Parse different log formats:
# Syslog format
log_template = "${timestamp:none} [${level:none}] ${service:none}: ${message:none}"

# Custom application format
log_template = "${service:none} | ${timestamp:none} | ${level:none} - ${message:none}"

Running the Example

python enrichment.py

Troubleshooting

Check that the template format matches log structure exactly, including spaces and delimiters.
Use how="left" in the join to preserve all logs; add default values for missing metadata.
Include more context in the semantic.map prompt (service criticality, team, historical context).

Next Steps

  • Integrate with real log streams (Kafka, Elasticsearch)
  • Set up automated alerting for critical incidents
  • Build historical trend analysis
  • Create auto-generated incident reports

Build docs developers (and LLMs) love