Skip to main content

Open in Colab

A security-focused NER pipeline using Fenic’s semantic extraction capabilities to identify and analyze threats, vulnerabilities, and indicators of compromise from unstructured security reports.

Overview

This pipeline demonstrates automated security entity extraction and risk assessment:
  • Zero-shot entity extraction (CVEs, IPs, domains, hashes)
  • Enhanced extraction with threat intelligence context
  • Document chunking for comprehensive analysis
  • Risk prioritization and actionable intelligence

Pipeline Stages

1

Basic NER

Extract standard security entities using zero-shot extraction.
2

Enhanced NER

Add threat-specific context with domain-specific schemas.
3

Chunking

Handle long documents effectively with text chunking.
4

Analytics

Aggregate and analyze extracted entities.
5

Risk Assessment

Generate actionable intelligence with severity ratings.

Prerequisites

pip install fenic

Implementation

Session Configuration

import fenic as fc
from pydantic import BaseModel, Field
from typing import List
import re

config = fc.SessionConfig(
    app_name="security_vulnerability_ner",
    semantic=fc.SemanticConfig(
        language_models={
            "mini": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200_000
            )
        }
    )
)

session = fc.Session.get_or_create(config)

Stage 1: Basic NER with Zero-Shot Extraction

# Define basic NER schema for security entities
class BasicNERSchema(BaseModel):
    cve_ids: List[str] = Field(
        description="CVE identifiers in format CVE-YYYY-NNNNN"
    )
    software_packages: List[str] = Field(
        description="Software names and versions mentioned"
    )
    ip_addresses: List[str] = Field(
        description="IP addresses (IPv4 or IPv6)"
    )
    domains: List[str] = Field(
        description="Domain names and URLs"
    )
    file_hashes: List[str] = Field(
        description="File hashes (MD5, SHA1, SHA256)"
    )

# Apply basic extraction
basic_extraction_df = reports_df.select(
    "report_id",
    "source",
    "title",
    fc.semantic.extract("content", BasicNERSchema).alias("basic_entities")
).cache()

# Display sample results
basic_readable = basic_extraction_df.select(
    "report_id",
    basic_extraction_df.basic_entities.cve_ids.alias("cve_ids"),
    basic_extraction_df.basic_entities.software_packages.alias("software_packages")
)

basic_readable.show(2)
Zero-shot extraction works immediately without any examples - just define what you want to extract in the field descriptions.

Stage 2: Enhanced Domain-Specific Extraction

# Define enhanced schema with security-specific entities
class EnhancedNERSchema(BaseModel):
    cve_ids: List[str] = Field(
        description="CVE identifiers in format CVE-YYYY-NNNNN"
    )
    software_packages: List[str] = Field(
        description="Software names with specific version numbers"
    )
    ip_addresses: List[str] = Field(
        description="IP addresses (IPv4 or IPv6)"
    )
    domains: List[str] = Field(
        description="Domain names, subdomains, and URLs"
    )
    file_hashes: List[str] = Field(
        description="File hashes with hash type prefix (MD5:, SHA1:, SHA256:)"
    )
    attack_vectors: List[str] = Field(
        description="Attack methods like buffer overflow, SQL injection, phishing"
    )
    threat_actors: List[str] = Field(
        description="Threat actor names, APT groups, ransomware families"
    )
    cvss_scores: List[str] = Field(
        description="CVSS scores and severity ratings"
    )
    mitre_techniques: List[str] = Field(
        description="MITRE ATT&CK technique IDs (TXXXX format)"
    )
    affected_systems: List[str] = Field(
        description="Operating systems, platforms, or infrastructure affected"
    )

# Preprocess content for better extraction
@fc.udf(return_type=fc.StringType)
def preprocess_udf(content):
    # Standardize CVE format
    content = re.sub(r'CVE\s*-\s*(\d{4})\s*-\s*(\d+)', r'CVE-\1-\2', content)
    # Normalize version ranges
    content = re.sub(r'(\d+\.\d+\.\d+)\s+through\s+(\d+\.\d+\.\d+)', r'\1 to \2', content)
    # Clean up extra whitespace
    content = ' '.join(content.split())
    return content

# Apply preprocessing and enhanced extraction
enhanced_df = reports_df.select(
    "report_id",
    "source",
    "title",
    "content",
    preprocess_udf("content").alias("processed_content")
).select(
    "report_id",
    "source",
    "title",
    "content",
    fc.semantic.extract("processed_content", EnhancedNERSchema).alias("entities")
).cache()

print("Enhanced extraction with security-specific entities:")
enhanced_readable = enhanced_df.select(
    "report_id",
    enhanced_df.entities.threat_actors.alias("threat_actors"),
    enhanced_df.entities.attack_vectors.alias("attack_vectors"),
    enhanced_df.entities.cvss_scores.alias("cvss_scores")
)

enhanced_readable.show(2)

Stage 3: Process Long Documents with Chunking

# Add content length for chunking decisions
reports_with_length = enhanced_df.select(
    "*",
    fc.text.length(fc.col("content")).alias("content_length")
)

# Identify documents needing chunking
long_reports = reports_with_length.filter(fc.col("content_length") > 80)
short_reports = reports_with_length.filter(fc.col("content_length") <= 80)

print(f"Documents requiring chunking: {long_reports.count()}")
print(f"Documents processed whole: {short_reports.count()}")

# Apply chunking to long documents
chunked_df = long_reports.select(
    "report_id",
    "content",
    fc.text.recursive_word_chunk(
        fc.col("content"),
        chunk_size=50,
        chunk_overlap_percentage=15
    ).alias("chunks")
).explode("chunks").select(
    "report_id",
    fc.col("chunks").alias("chunk")
)

# Extract entities from each chunk
chunk_entities_df = chunked_df.select(
    "report_id",
    "chunk",
    fc.semantic.extract("chunk", EnhancedNERSchema).alias("chunk_entities")
).cache()

# Aggregate entities across chunks
aggregated_entities = chunk_entities_df.group_by("report_id").agg(
    fc.collect_list(fc.col("chunk_entities")).alias("all_chunk_entities")
)

print(f"Total chunks processed: {chunk_entities_df.count()}")
Chunking is essential for processing long security reports that exceed LLM context windows.

Stage 4: Analytics and Aggregation

# Create a unified view for validation
all_entities_df = enhanced_df.select(
    "report_id",
    "source",
    "title",
    "entities"
)

# Flatten entities for analysis
flattened_cves = all_entities_df.select(
    all_entities_df.entities.cve_ids.alias("cve_id")
).explode("cve_id").filter(fc.col("cve_id").is_not_null())

flattened_software = all_entities_df.select(
    all_entities_df.entities.software_packages.alias("software")
).explode("software").filter(fc.col("software").is_not_null())

flattened_threats = all_entities_df.select(
    all_entities_df.entities.threat_actors.alias("threat_actor")
).explode("threat_actor").filter(fc.col("threat_actor").is_not_null())

# Most common CVEs
print("Top CVEs mentioned:")
cve_counts = flattened_cves.group_by("cve_id").agg(
    fc.count("*").alias("mentions")
).order_by(fc.col("mentions").desc())

cve_counts.show(5)

# Most affected software
print("Most affected software:")
software_counts = flattened_software.group_by("software").agg(
    fc.count("*").alias("mentions")
).order_by(fc.col("mentions").desc())

software_counts.show(5)

# Active threat actors
print("Active threat actors:")
threat_counts = flattened_threats.group_by("threat_actor").agg(
    fc.count("*").alias("reports")
).order_by(fc.col("reports").desc())

threat_counts.show(5)

Stage 5: Risk Assessment

# Define Pydantic model for risk assessment
class ExtractedRiskInfo(BaseModel):
    """
    Directly extracted risk information from the report text.
    If a value is not present in the report, use an empty string.
    """
    severity_rating: str = Field(
        ...,
        description="Explicit severity rating or risk level as stated in the report "
                    "(e.g., 'critical', 'high', 'medium', 'low')"
    )
    cvss_score: str = Field(
        ...,
        description="CVSS score as stated in the report"
    )
    mitigation_steps: str = Field(
        ...,
        description="Quoted mitigation or remediation steps as stated in the report"
    )
    affected_systems: str = Field(
        ...,
        description="Exact systems, platforms, or users mentioned as affected in the report"
    )

# Assess risk for each report
risk_assessment_df = enhanced_df.select(
    "report_id",
    "title",
    fc.semantic.extract("content", ExtractedRiskInfo).alias("risk_assessment")
)

# Show high-risk items
high_risk_df = risk_assessment_df.select(
    "report_id",
    "title",
    risk_assessment_df.risk_assessment.severity_rating.alias("risk_level"),
    risk_assessment_df.risk_assessment.mitigation_steps.alias("immediate_action"),
    risk_assessment_df.risk_assessment.affected_systems.alias("affected_scope")
).filter(
    (fc.col("risk_level") == "critical") | (fc.col("risk_level") == "high")
)

print("High-Risk Vulnerabilities Requiring Immediate Action:")
high_risk_df.show()

Summary Statistics

total_cves = flattened_cves.count()
unique_cves = flattened_cves.select("cve_id").drop_duplicates().count()
total_threats = flattened_threats.count()
unique_threats = flattened_threats.select("threat_actor").drop_duplicates().count()

print(f"Total CVEs extracted: {total_cves} ({unique_cves} unique)")
print(f"Total threat actors identified: {total_threats} ({unique_threats} unique)")
print(f"Reports processed: {reports_df.count()}")

Running the Example

python ner.py

Troubleshooting

Increase chunk size or adjust overlap percentage for better context. Try chunk_size=100 and chunk_overlap_percentage=20.
Add more specific descriptions in the Pydantic field definitions. For example: "Threat actor names including APT groups (APT28, APT29), ransomware families (LockBit, BlackCat)".
Include more context about your organization in the assessment prompt, such as criticality thresholds and asset classifications.

Next Steps for Production

  • Integrate with vulnerability feeds (NVD, vendor advisories)
  • Set up real-time processing pipeline
  • Export to SIEM/SOAR platforms
  • Create automated incident response workflows
  • Build threat intelligence knowledge base
Combine NER with semantic classification to automatically triage security reports by severity and route them to appropriate teams.

Build docs developers (and LLMs) love