Skip to main content

Overview

The Risk Engine module (modules/risk_engine.py) performs comprehensive risk assessment by analyzing port exposure, vulnerabilities, CVE data, and exploitability. It calculates risk scores using CVSS methodology and generates prioritized remediation recommendations.

RiskEngine Class

Defined at risk_engine.py:10, this class implements multi-factor risk scoring.

Initialization

risk_engine.py:11-26
class RiskEngine:
    def __init__(self):
        """Initialize risk engine"""
        self.risk_scores = {
            'CRITICAL': 10,
            'HIGH': 7,
            'MEDIUM': 4,
            'LOW': 2,
            'UNKNOWN': 1
        }
        
        self.severity_weights = {
            'exploitable': 2.0,
            'public_exploit': 1.5,
            'network_accessible': 1.3,
            'authenticated': 0.7
        }
Risk Score Mapping:
  • CRITICAL: 10 points (immediate action required)
  • HIGH: 7 points (fix within 30 days)
  • MEDIUM: 4 points (fix within 90 days)
  • LOW: 2 points (fix when convenient)
  • UNKNOWN: 1 point (informational)

CVSS Risk Calculation

calculate_cvss_risk()

Converts CVSS scores to standardized risk levels.
risk_engine.py:28-39
def calculate_cvss_risk(self, cvss_score):
    """Convert CVSS score to risk level"""
    if cvss_score >= 9.0:
        return "CRITICAL"
    elif cvss_score >= 7.0:
        return "HIGH"
    elif cvss_score >= 4.0:
        return "MEDIUM"
    elif cvss_score > 0.0:
        return "LOW"
    else:
        return "UNKNOWN"

Port-Level Risk Assessment

calculate_port_risk()

Calculates risk score for individual ports based on multiple factors.
risk_engine.py:41-94
def calculate_port_risk(self, port_data, vulnerabilities, cves):
    """Calculate risk for a specific port"""
    risk_score = 0
    risk_factors = []
    
    port_num = port_data.get('port')
    service = port_data.get('service', '').lower()
    
    # Base risk for port exposure
    if port_num < 1024:  # Well-known ports
        risk_score += 1
        risk_factors.append("Well-known port exposed")
    
    # Check for sensitive services
    sensitive_services = ['ftp', 'telnet', 'smtp', 'mysql', 'postgresql', 
                         'microsoft-ds', 'netbios-ssn', 'rdp']
    
    if any(svc in service for svc in sensitive_services):
        risk_score += 2
        risk_factors.append(f"Sensitive service exposed: {service}")
    
    # Check vulnerabilities for this port
    port_vulns = [v for v in vulnerabilities if v.get('port') == port_num]
    
    for vuln in port_vulns:
        severity = vuln.get('severity', 'UNKNOWN')
        risk_score += self.risk_scores.get(severity, 1)
        risk_factors.append(f"{severity} vulnerability: {vuln.get('name')}")
        
        if vuln.get('exploitable', False):
            risk_score *= 1.5  # 50% increase for exploitable vulns
            risk_factors.append("Exploitable vulnerability detected")
    
    # Check CVEs for this port
    port_cves = [c for c in cves if c.get('port') == port_num]
    
    for cve in port_cves:
        cvss = cve.get('cvss_score', 0)
        risk_score += cvss / 2  # Weight CVE score
        
        if cve.get('exploitable', False):
            risk_score *= 1.3  # 30% increase for CVE with exploit
            risk_factors.append(f"CVE with public exploit: {cve.get('cve_id')}")
    
    # Normalize to 0-10 scale
    normalized_score = min(risk_score, 10.0)
    
    return {
        'port': port_num,
        'service': service,
        'risk_score': round(normalized_score, 2),
        'risk_level': self.calculate_cvss_risk(normalized_score),
        'risk_factors': risk_factors
    }
Risk Calculation Factors:
Well-known ports (< 1024) add +1 base risk due to higher visibility and targeting.
Services like FTP, Telnet, RDP, SMB add +2 risk due to common exploitation and credential attacks.
Each vulnerability adds its severity score (CRITICAL=10, HIGH=7, MEDIUM=4, LOW=2).
Exploitable vulnerabilities multiply the risk score by 1.5 (50% increase).
CVEs add half their CVSS score to the risk. CVEs with public exploits multiply by 1.3 (30% increase).

System-Wide Risk Assessment

calculate_overall_risk()

Aggregates all risk factors to determine overall system risk.
risk_engine.py:96-150
def calculate_overall_risk(self, scan_data, vulnerabilities, cves, web_vulns, sql_vulns):
    """Calculate overall system risk score"""
    total_risk_score = 0
    risk_breakdown = {
        'port_risks': [],
        'critical_count': 0,
        'high_count': 0,
        'medium_count': 0,
        'low_count': 0
    }
    
    # Calculate risk for each port
    for port in scan_data.get('ports', []):
        port_risk = self.calculate_port_risk(port, vulnerabilities, cves)
        risk_breakdown['port_risks'].append(port_risk)
        total_risk_score += port_risk['risk_score']
    
    # Add web vulnerability risks
    for web_vuln in web_vulns:
        severity = web_vuln.get('severity', 'MEDIUM')
        total_risk_score += self.risk_scores.get(severity, 4)
        risk_breakdown[f"{severity.lower()}_count"] += 1
    
    # SQL injection = automatic CRITICAL
    if sql_vulns:
        total_risk_score += len(sql_vulns) * 10
        risk_breakdown['critical_count'] += len(sql_vulns)
    
    # Calculate overall risk level
    num_ports = len(scan_data.get('ports', []))
    avg_risk_score = total_risk_score / max(num_ports, 1)
    
    overall_risk = self.calculate_cvss_risk(avg_risk_score)
    
    return {
        'total_risk_score': round(total_risk_score, 2),
        'average_risk_score': round(avg_risk_score, 2),
        'overall_risk_level': overall_risk,
        'total_vulnerabilities': len(vulnerabilities),
        'web_vulnerabilities': len(web_vulns),
        'sql_vulnerabilities': len(sql_vulns),
        'port_risk_breakdown': risk_breakdown,
        'recommendations': self.generate_recommendations(risk_breakdown)
    }

Risk Severity Weights

Different vulnerability types have different severity multipliers:
FactorWeightImpact
Exploitable vulnerability2.0xDoubles the base risk
Public exploit available1.5x50% increase
Network accessible1.3x30% increase
Requires authentication0.7x30% decrease

Prioritized Recommendations

generate_recommendations()

Generates actionable remediation recommendations based on risk assessment.
risk_engine.py:165-195
def generate_recommendations(self, risk_breakdown):
    """Generate prioritized security recommendations"""
    recommendations = []
    
    # Critical vulnerabilities
    if risk_breakdown['critical_count'] > 0:
        recommendations.append({
            'priority': 'CRITICAL',
            'action': 'Patch critical vulnerabilities immediately',
            'details': f"{risk_breakdown['critical_count']} critical issues found"
        })
    
    # Sensitive service exposure
    for port_risk in risk_breakdown['port_risks']:
        if any('Sensitive service' in factor for factor in port_risk['risk_factors']):
            recommendations.append({
                'priority': 'HIGH',
                'action': f"Secure or disable {port_risk['service']} on port {port_risk['port']}",
                'details': 'Consider firewall restrictions or VPN-only access'
            })
    
    # Exploitable vulnerabilities
    exploitable_ports = [p for p in risk_breakdown['port_risks'] 
                        if any('Exploitable' in f for f in p['risk_factors'])]
    if exploitable_ports:
        recommendations.append({
            'priority': 'HIGH',
            'action': 'Address exploitable vulnerabilities',
            'details': f"{len(exploitable_ports)} ports have exploitable issues"
        })
    
    return recommendations
Recommendation Categories:
  • Patch critical vulnerabilities (CVSS >= 9.0)
  • Fix SQL injection vulnerabilities
  • Address remote code execution flaws
Timeline: Immediate (within 24-48 hours)

Usage Example

from modules.risk_engine import RiskEngine

# Initialize risk engine
risk_engine = RiskEngine()

# Calculate overall risk
risk_results = risk_engine.calculate_overall_risk(
    scan_results,
    vulnerabilities,
    cves,
    web_vulns,
    sql_vulns
)

# Display results
print(f"Overall Risk: {risk_results['overall_risk_level']}")
print(f"Risk Score: {risk_results['average_risk_score']}/10")
print(f"Total Vulnerabilities: {risk_results['total_vulnerabilities']}")

for rec in risk_results['recommendations']:
    print(f"[{rec['priority']}] {rec['action']}")
    print(f"  → {rec['details']}")

Output Format

{
  "total_risk_score": 45.8,
  "average_risk_score": 7.6,
  "overall_risk_level": "HIGH",
  "total_vulnerabilities": 12,
  "web_vulnerabilities": 5,
  "sql_vulnerabilities": 1,
  "port_risk_breakdown": {
    "port_risks": [
      {
        "port": 22,
        "service": "ssh",
        "risk_score": 5.3,
        "risk_level": "MEDIUM",
        "risk_factors": [
          "Well-known port exposed",
          "CVE with public exploit: CVE-2018-15473"
        ]
      }
    ],
    "critical_count": 1,
    "high_count": 3,
    "medium_count": 5,
    "low_count": 3
  },
  "recommendations": [
    {
      "priority": "CRITICAL",
      "action": "Fix SQL injection on port 80",
      "details": "Immediate patching required"
    },
    {
      "priority": "HIGH",
      "action": "Secure or disable rdp on port 3389",
      "details": "Consider firewall restrictions or VPN-only access"
    }
  ]
}

Risk Scoring Formula

The final risk score is calculated using:
Risk Score = Σ(Port Base Risk + Vulnerability Scores × Exploitability Multiplier + CVE Scores × Exploit Multiplier)

Average Risk = Total Risk Score / Number of Open Ports

Overall Risk Level = CVSS_Risk_Level(Average Risk Score)
SQL injection vulnerabilities automatically elevate the overall risk to at least HIGH, regardless of other factors.

CVE Lookup

How CVE data feeds into risk calculation

Vulnerability Scanner

Web vulnerability detection

PDF Reports

Risk assessment in generated reports

Workflow

Phase 5: Risk assessment in the workflow

Build docs developers (and LLMs) love