Overview
The Database module (modules/database.py) provides SQLite-based data persistence for all scan results. It manages five normalized tables storing scans, ports, vulnerabilities, web findings, and exploit attempts.
Database Class
Defined at database.py:12, this class handles all database operations.
Initialization
class Database :
def __init__ ( self , db_path = "database/autopentestx.db" ):
"""Initialize database connection"""
self .db_path = db_path
self .ensure_directory()
self .conn = None
self .cursor = None
self .connect()
self .create_tables()
db_path
string
default: "database/autopentestx.db"
Path to SQLite database file
Database Schema
AutoPentestX uses a normalized schema with five tables:
scans Table
Stores scan metadata and overall results.
CREATE TABLE scans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
target TEXT NOT NULL ,
scan_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
os_detection TEXT ,
scan_duration REAL ,
total_ports INTEGER ,
open_ports INTEGER ,
vulnerabilities_found INTEGER ,
risk_score TEXT ,
status TEXT DEFAULT 'completed'
)
Fields:
id: Unique scan identifier (auto-increment)
target: IP address or domain scanned
scan_date: Timestamp when scan started
os_detection: Detected operating system
scan_duration: Total scan time in seconds
total_ports: Ports scanned count
open_ports: Open ports discovered
vulnerabilities_found: Total vulnerabilities
risk_score: Overall risk level (CRITICAL/HIGH/MEDIUM/LOW)
status: Scan status (completed/failed/interrupted)
ports Table
Stores discovered open ports and services.
CREATE TABLE ports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_id INTEGER ,
port_number INTEGER ,
protocol TEXT ,
state TEXT ,
service_name TEXT ,
service_version TEXT ,
FOREIGN KEY (scan_id) REFERENCES scans(id)
)
vulnerabilities Table
Stores general vulnerabilities and CVEs.
CREATE TABLE vulnerabilities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_id INTEGER ,
port_number INTEGER ,
service_name TEXT ,
vuln_name TEXT ,
vuln_description TEXT ,
cve_id TEXT ,
cvss_score REAL ,
risk_level TEXT ,
exploitable BOOLEAN ,
FOREIGN KEY (scan_id) REFERENCES scans(id)
)
web_vulnerabilities Table
Stores web-specific vulnerabilities from Nikto/SQLMap.
CREATE TABLE web_vulnerabilities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_id INTEGER ,
url TEXT ,
vulnerability_type TEXT ,
severity TEXT ,
description TEXT ,
osvdb_id TEXT ,
FOREIGN KEY (scan_id) REFERENCES scans(id)
)
exploits Table
Stores exploitation attempt results.
CREATE TABLE exploits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_id INTEGER ,
vuln_id INTEGER ,
exploit_name TEXT ,
exploit_status TEXT ,
exploit_result TEXT ,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (scan_id) REFERENCES scans(id),
FOREIGN KEY (vuln_id) REFERENCES vulnerabilities(id)
)
CRUD Operations
Insert Operations
insert_scan()
Creates a new scan entry and returns the scan ID.
def insert_scan ( self , target ):
"""Insert new scan record"""
try :
self .cursor.execute( '''
INSERT INTO scans (target, scan_date, status)
VALUES (?, CURRENT_TIMESTAMP, 'running')
''' , (target,))
self .conn.commit()
scan_id = self .cursor.lastrowid
print ( f "[✓] Scan initialized: ID { scan_id } " )
return scan_id
except sqlite3.Error as e:
print ( f "[✗] Failed to insert scan: { e } " )
return None
insert_port()
def insert_port ( self , scan_id , port_data ):
"""Insert port scan result"""
self .cursor.execute( '''
INSERT INTO ports (scan_id, port_number, protocol, state, service_name, service_version)
VALUES (?, ?, ?, ?, ?, ?)
''' , (
scan_id,
port_data.get( 'port' ),
port_data.get( 'protocol' , 'tcp' ),
port_data.get( 'state' , 'open' ),
port_data.get( 'service' , 'unknown' ),
port_data.get( 'version' , '' )
))
self .conn.commit()
insert_vulnerability()
def insert_vulnerability ( self , scan_id , vuln_data ):
"""Insert vulnerability record"""
self .cursor.execute( '''
INSERT INTO vulnerabilities
(scan_id, port_number, service_name, vuln_name, vuln_description,
cve_id, cvss_score, risk_level, exploitable)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''' , (
scan_id,
vuln_data.get( 'port' ),
vuln_data.get( 'service' ),
vuln_data.get( 'name' ),
vuln_data.get( 'description' , '' ),
vuln_data.get( 'cve_id' ),
vuln_data.get( 'cvss_score' , 0.0 ),
vuln_data.get( 'risk_level' , 'UNKNOWN' ),
vuln_data.get( 'exploitable' , False )
))
self .conn.commit()
Update Operations
update_scan()
Updates scan record with results.
def update_scan ( self , scan_id , ** kwargs ):
"""Update scan record with results"""
valid_fields = [ 'os_detection' , 'scan_duration' , 'total_ports' ,
'open_ports' , 'vulnerabilities_found' , 'risk_score' , 'status' ]
for field, value in kwargs.items():
if field in valid_fields:
self .cursor.execute( f '''
UPDATE scans SET { field } = ? WHERE id = ?
''' , (value, scan_id))
self .conn.commit()
Query Operations
get_scan_by_id()
def get_scan_by_id ( self , scan_id ):
"""Retrieve scan by ID"""
self .cursor.execute( 'SELECT * FROM scans WHERE id = ?' , (scan_id,))
row = self .cursor.fetchone()
if row:
columns = [desc[ 0 ] for desc in self .cursor.description]
return dict ( zip (columns, row))
return None
get_ports_for_scan()
def get_ports_for_scan ( self , scan_id ):
"""Get all ports for a scan"""
self .cursor.execute( 'SELECT * FROM ports WHERE scan_id = ?' , (scan_id,))
rows = self .cursor.fetchall()
columns = [desc[ 0 ] for desc in self .cursor.description]
return [ dict ( zip (columns, row)) for row in rows]
get_vulnerabilities_for_scan()
def get_vulnerabilities_for_scan ( self , scan_id ):
"""Get all vulnerabilities for a scan"""
self .cursor.execute( 'SELECT * FROM vulnerabilities WHERE scan_id = ?' , (scan_id,))
rows = self .cursor.fetchall()
columns = [desc[ 0 ] for desc in self .cursor.description]
return [ dict ( zip (columns, row)) for row in rows]
Usage Example
from modules.database import Database
# Initialize database
db = Database()
# Create new scan
scan_id = db.insert_scan( "192.168.1.100" )
# Insert scan results
for port in open_ports:
db.insert_port(scan_id, port)
for vuln in vulnerabilities:
db.insert_vulnerability(scan_id, vuln)
# Update scan metadata
db.update_scan(
scan_id,
os_detection = "Linux 4.15" ,
scan_duration = 123.45 ,
open_ports = 5 ,
vulnerabilities_found = 3 ,
risk_score = "HIGH" ,
status = "completed"
)
# Query scan results
scan_data = db.get_scan_by_id(scan_id)
ports = db.get_ports_for_scan(scan_id)
vulns = db.get_vulnerabilities_for_scan(scan_id)
# Close connection
db.close()
Error Handling
The Database module handles these common errors:
Returns error if database file cannot be created or accessed. except sqlite3.Error as e:
print ( f "[✗] Database connection error: { e } " )
raise
Returns None instead of crashing if insert fails. except sqlite3.Error as e:
print ( f "[✗] Failed to insert: { e } " )
return None
SQLite enforces foreign key constraints. Ensure scan_id exists before inserting related records.
Database Maintenance
Backup
cp database/autopentestx.db database/autopentestx_backup_ $( date +%Y%m%d ) .db
Vacuum
sqlite3 database/autopentestx.db "VACUUM;"
Clear Old Scans
DELETE FROM scans WHERE scan_date < datetime ( 'now' , '-90 days' );
Database Queries Common SQL queries for data analysis
Configuration Database settings and options
PDF Reports How reports retrieve data from database
Workflow Phase 1: Database initialization