Skip to main content

Overview

The Database module (modules/database.py) provides SQLite-based data persistence for all scan results. It manages five normalized tables storing scans, ports, vulnerabilities, web findings, and exploit attempts.

Database Class

Defined at database.py:12, this class handles all database operations.

Initialization

database.py:13-20
class Database:
    def __init__(self, db_path="database/autopentestx.db"):
        """Initialize database connection"""
        self.db_path = db_path
        self.ensure_directory()
        self.conn = None
        self.cursor = None
        self.connect()
        self.create_tables()
db_path
string
default:"database/autopentestx.db"
Path to SQLite database file

Database Schema

AutoPentestX uses a normalized schema with five tables:

scans Table

Stores scan metadata and overall results.
database.py:42-55
CREATE TABLE scans (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    target TEXT NOT NULL,
    scan_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    os_detection TEXT,
    scan_duration REAL,
    total_ports INTEGER,
    open_ports INTEGER,
    vulnerabilities_found INTEGER,
    risk_score TEXT,
    status TEXT DEFAULT 'completed'
)
Fields:
  • id: Unique scan identifier (auto-increment)
  • target: IP address or domain scanned
  • scan_date: Timestamp when scan started
  • os_detection: Detected operating system
  • scan_duration: Total scan time in seconds
  • total_ports: Ports scanned count
  • open_ports: Open ports discovered
  • vulnerabilities_found: Total vulnerabilities
  • risk_score: Overall risk level (CRITICAL/HIGH/MEDIUM/LOW)
  • status: Scan status (completed/failed/interrupted)

ports Table

Stores discovered open ports and services.
database.py:57-69
CREATE TABLE ports (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    scan_id INTEGER,
    port_number INTEGER,
    protocol TEXT,
    state TEXT,
    service_name TEXT,
    service_version TEXT,
    FOREIGN KEY (scan_id) REFERENCES scans(id)
)

vulnerabilities Table

Stores general vulnerabilities and CVEs.
database.py:71-86
CREATE TABLE vulnerabilities (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    scan_id INTEGER,
    port_number INTEGER,
    service_name TEXT,
    vuln_name TEXT,
    vuln_description TEXT,
    cve_id TEXT,
    cvss_score REAL,
    risk_level TEXT,
    exploitable BOOLEAN,
    FOREIGN KEY (scan_id) REFERENCES scans(id)
)

web_vulnerabilities Table

Stores web-specific vulnerabilities from Nikto/SQLMap.
database.py:101-111
CREATE TABLE web_vulnerabilities (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    scan_id INTEGER,
    url TEXT,
    vulnerability_type TEXT,
    severity TEXT,
    description TEXT,
    osvdb_id TEXT,
    FOREIGN KEY (scan_id) REFERENCES scans(id)
)

exploits Table

Stores exploitation attempt results.
database.py:88-100
CREATE TABLE exploits (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    scan_id INTEGER,
    vuln_id INTEGER,
    exploit_name TEXT,
    exploit_status TEXT,
    exploit_result TEXT,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (scan_id) REFERENCES scans(id),
    FOREIGN KEY (vuln_id) REFERENCES vulnerabilities(id)
)

CRUD Operations

Insert Operations

insert_scan()

Creates a new scan entry and returns the scan ID.
database.py:120-135
def insert_scan(self, target):
    """Insert new scan record"""
    try:
        self.cursor.execute('''
            INSERT INTO scans (target, scan_date, status)
            VALUES (?, CURRENT_TIMESTAMP, 'running')
        ''', (target,))
        self.conn.commit()
        scan_id = self.cursor.lastrowid
        print(f"[✓] Scan initialized: ID {scan_id}")
        return scan_id
    except sqlite3.Error as e:
        print(f"[✗] Failed to insert scan: {e}")
        return None

insert_port()

database.py:155-170
def insert_port(self, scan_id, port_data):
    """Insert port scan result"""
    self.cursor.execute('''
        INSERT INTO ports (scan_id, port_number, protocol, state, service_name, service_version)
        VALUES (?, ?, ?, ?, ?, ?)
    ''', (
        scan_id,
        port_data.get('port'),
        port_data.get('protocol', 'tcp'),
        port_data.get('state', 'open'),
        port_data.get('service', 'unknown'),
        port_data.get('version', '')
    ))
    self.conn.commit()

insert_vulnerability()

database.py:172-195
def insert_vulnerability(self, scan_id, vuln_data):
    """Insert vulnerability record"""
    self.cursor.execute('''
        INSERT INTO vulnerabilities 
        (scan_id, port_number, service_name, vuln_name, vuln_description, 
         cve_id, cvss_score, risk_level, exploitable)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', (
        scan_id,
        vuln_data.get('port'),
        vuln_data.get('service'),
        vuln_data.get('name'),
        vuln_data.get('description', ''),
        vuln_data.get('cve_id'),
        vuln_data.get('cvss_score', 0.0),
        vuln_data.get('risk_level', 'UNKNOWN'),
        vuln_data.get('exploitable', False)
    ))
    self.conn.commit()

Update Operations

update_scan()

Updates scan record with results.
database.py:137-153
def update_scan(self, scan_id, **kwargs):
    """Update scan record with results"""
    valid_fields = ['os_detection', 'scan_duration', 'total_ports', 
                   'open_ports', 'vulnerabilities_found', 'risk_score', 'status']
    
    for field, value in kwargs.items():
        if field in valid_fields:
            self.cursor.execute(f'''
                UPDATE scans SET {field} = ? WHERE id = ?
            ''', (value, scan_id))
    
    self.conn.commit()

Query Operations

get_scan_by_id()

database.py:230-245
def get_scan_by_id(self, scan_id):
    """Retrieve scan by ID"""
    self.cursor.execute('SELECT * FROM scans WHERE id = ?', (scan_id,))
    row = self.cursor.fetchone()
    
    if row:
        columns = [desc[0] for desc in self.cursor.description]
        return dict(zip(columns, row))
    return None

get_ports_for_scan()

database.py:247-260
def get_ports_for_scan(self, scan_id):
    """Get all ports for a scan"""
    self.cursor.execute('SELECT * FROM ports WHERE scan_id = ?', (scan_id,))
    rows = self.cursor.fetchall()
    
    columns = [desc[0] for desc in self.cursor.description]
    return [dict(zip(columns, row)) for row in rows]

get_vulnerabilities_for_scan()

database.py:262-275
def get_vulnerabilities_for_scan(self, scan_id):
    """Get all vulnerabilities for a scan"""
    self.cursor.execute('SELECT * FROM vulnerabilities WHERE scan_id = ?', (scan_id,))
    rows = self.cursor.fetchall()
    
    columns = [desc[0] for desc in self.cursor.description]
    return [dict(zip(columns, row)) for row in rows]

Usage Example

from modules.database import Database

# Initialize database
db = Database()

# Create new scan
scan_id = db.insert_scan("192.168.1.100")

# Insert scan results
for port in open_ports:
    db.insert_port(scan_id, port)

for vuln in vulnerabilities:
    db.insert_vulnerability(scan_id, vuln)

# Update scan metadata
db.update_scan(
    scan_id,
    os_detection="Linux 4.15",
    scan_duration=123.45,
    open_ports=5,
    vulnerabilities_found=3,
    risk_score="HIGH",
    status="completed"
)

# Query scan results
scan_data = db.get_scan_by_id(scan_id)
ports = db.get_ports_for_scan(scan_id)
vulns = db.get_vulnerabilities_for_scan(scan_id)

# Close connection
db.close()

Error Handling

The Database module handles these common errors:
Returns error if database file cannot be created or accessed.
except sqlite3.Error as e:
    print(f"[✗] Database connection error: {e}")
    raise
Returns None instead of crashing if insert fails.
except sqlite3.Error as e:
    print(f"[✗] Failed to insert: {e}")
    return None
SQLite enforces foreign key constraints. Ensure scan_id exists before inserting related records.

Database Maintenance

Backup

cp database/autopentestx.db database/autopentestx_backup_$(date +%Y%m%d).db

Vacuum

sqlite3 database/autopentestx.db "VACUUM;"

Clear Old Scans

DELETE FROM scans WHERE scan_date < datetime('now', '-90 days');

Database Queries

Common SQL queries for data analysis

Configuration

Database settings and options

PDF Reports

How reports retrieve data from database

Workflow

Phase 1: Database initialization

Build docs developers (and LLMs) love