A log processing system using Fenic’s text extraction and semantic enrichment capabilities to transform unstructured logs into actionable incident response data.
Overview
This pipeline demonstrates log enrichment through multi-stage processing:
Template-based parsing without regex
Service metadata enrichment via joins
LLM-powered error categorization and remediation
Incident severity assessment with business context
What You’ll Build
Parse unstructured logs
Extract structured fields from syslog-format messages using template extraction.
Enrich with metadata
Join with service ownership and criticality data for context.
Apply semantic analysis
Use LLM operations for error categorization, severity assessment, and remediation.
Prerequisites
Implementation
Stage 1: Template-Based Parsing
Extract structured fields from syslog-format messages without using regex:
import fenic as fc
from pydantic import BaseModel, Field
# Configure session
config = fc.SessionConfig(
app_name = "log_enrichment" ,
semantic = fc.SemanticConfig(
language_models = {
"mini" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 500 ,
tpm = 200_000
)
}
)
)
session = fc.Session.get_or_create(config)
# Define log template
log_template = "$ {timestamp:none} [$ {level:none} ] $ {service:none} : $ {message:none} "
# Parse logs using template extraction
parsed_df = logs_df.select(
fc.text.extract( "raw_message" , log_template).alias( "parsed" )
).select(
fc.col( "parsed" ).get_item( "timestamp" ).alias( "timestamp" ),
fc.col( "parsed" ).get_item( "level" ).alias( "level" ),
fc.col( "parsed" ).get_item( "service" ).alias( "service" ),
fc.col( "parsed" ).get_item( "message" ).alias( "message" )
).filter(
fc.col( "timestamp" ).is_not_null()
)
Template extraction uses ${field:type} syntax where type can be none (string), int, float, etc.
Join with service ownership and criticality data:
# Rename service_name to service for join
metadata_df_renamed = metadata_df.select(
fc.col( "service_name" ).alias( "service" ),
"team_owner" ,
"criticality" ,
"on_call_channel"
)
# Join parsed logs with metadata
enriched_df = parsed_df.join(
metadata_df_renamed,
on = "service" ,
how = "left"
).select(
"timestamp" ,
"level" ,
"service" ,
"message" ,
"team_owner" ,
"criticality" ,
"on_call_channel"
)
Stage 3: Semantic Enrichment
Apply LLM operations for error analysis and remediation:
# Define Pydantic model for error analysis
class ErrorAnalysis ( BaseModel ):
"""Pydantic model for semantic error extraction."""
error_category: str = Field(
description = "Main category of the error (e.g., database, network, authentication, resource)"
)
affected_component: str = Field(
description = "Specific component or resource affected"
)
potential_cause: str = Field(
description = "Most likely root cause of the issue"
)
# Apply semantic enrichment
final_df = enriched_df.select(
"timestamp" ,
"level" ,
"service" ,
"message" ,
"team_owner" ,
"criticality" ,
"on_call_channel" ,
# Extract error analysis using Pydantic model
fc.semantic.extract( "message" , ErrorAnalysis).alias( "analysis" ),
# Classify incident severity based on message and criticality
fc.semantic.classify(
fc.text.concat(
fc.col( "message" ),
fc.lit( " (criticality: " ),
fc.col( "criticality" ),
fc.lit( ")" )
),
[ "low" , "medium" , "high" , "critical" ]
).alias( "incident_severity" ),
# Generate remediation steps
fc.semantic.map(
(
"Generate 2-3 specific remediation steps that the on-call team should take to resolve this issue: "
" {{ message }} | Service: {{ service }} | Team: {{ team_owner }} "
),
message = fc.col( "message" ),
service = fc.col( "service" ),
team_owner = fc.col( "team_owner" )
).alias( "remediation_steps" )
)
The pipeline produces enriched logs with actionable intelligence:
# Extract analysis fields for readability
final_readable = final_df.select(
"timestamp" ,
"level" ,
"service" ,
"message" ,
"team_owner" ,
"criticality" ,
"on_call_channel" ,
fc.col( "analysis" ).get_item( "error_category" ).alias( "error_category" ),
fc.col( "analysis" ).get_item( "affected_component" ).alias( "affected_component" ),
fc.col( "analysis" ).get_item( "potential_cause" ).alias( "potential_cause" ),
"incident_severity" ,
"remediation_steps"
)
final_readable.show()
Sample Output
timestamp level service error_category incident_severity remediation_steps 2024-01-15 14:32:01 ERROR payment-api database critical 1. Check Database Connectivity… 2024-01-15 14:32:15 WARN user-service resource critical 1. Review Rate Limiting Config…
Analytics Examples
Generate insights from enriched logs:
# Error category distribution
final_readable.group_by( "error_category" ).agg(
fc.count( "*" ).alias( "count" )
).show()
# Severity by service criticality
final_readable.group_by( "criticality" , "incident_severity" ).agg(
fc.count( "*" ).alias( "count" )
).show()
# High-priority incidents requiring immediate attention
critical_incidents = final_readable.filter(
(fc.col( "incident_severity" ) == "critical" ) |
(fc.col( "incident_severity" ) == "high" )
).select(
"service" ,
"team_owner" ,
"incident_severity" ,
"on_call_channel" ,
"remediation_steps"
)
critical_incidents.show()
Configuration Options
Custom Log Templates
Parse different log formats:
# Syslog format
log_template = "$ {timestamp:none} [$ {level:none} ] $ {service:none} : $ {message:none} "
# Custom application format
log_template = "$ {service:none} | $ {timestamp:none} | $ {level:none} - $ {message:none} "
Running the Example
Troubleshooting
Template extraction returns empty fields
Missing service metadata after join
Use how="left" in the join to preserve all logs; add default values for missing metadata.
Generic remediation steps
Include more context in the semantic.map prompt (service criticality, team, historical context).
Next Steps
Integrate with real log streams (Kafka, Elasticsearch)
Set up automated alerting for critical incidents
Build historical trend analysis
Create auto-generated incident reports