Overview
The UC Intel Final platform implements a comprehensive ML workflow that guides users through all stages of a malware classification experiment, from dataset configuration to model evaluation and interpretability.
Data Preparation Dataset selection, splitting, and augmentation
Model Building Architecture selection and configuration
Training Hyperparameter tuning and model training
Monitoring Real-time training progress tracking
Evaluation Performance metrics and analysis
Interpretation Model explainability and visualization
Workflow Stages
The workflow is iterative - you can return to earlier stages to refine your experiment based on results and interpretability analysis.
Stage 1: Dataset Configuration
Dataset Selection
The platform automatically scans the repo/malware/ directory for malware family folders:
repo/malware/
├── Adialer.C/ # Family 1
│ ├── 00001.png
│ ├── 00002.png
│ └── ...
├── Agent.FYI/ # Family 2
├── Allaple.A/ # Family 3
└── ...
Implementation : app/utils/dataset_utils.py and app/training/dataset.py:37-60
def scan_dataset ( dataset_path : Path,
selected_families : list[ str ] | None = None
) -> tuple[list[Path], list[ int ], list[ str ]]:
"""Scan dataset directory and return image paths, labels, class names."""
image_paths = []
labels = []
class_names = []
family_dirs = sorted ([d for d in dataset_path.iterdir() if d.is_dir()])
# Filter if selected_families specified
if selected_families:
family_dirs = [d for d in family_dirs if d.name in selected_families]
for class_idx, family_dir in enumerate (family_dirs):
class_names.append(family_dir.name)
for img_file in family_dir.iterdir():
if img_file.suffix.lower() in [ '.png' , '.jpg' , '.jpeg' , '.bmp' ]:
image_paths.append(img_file)
labels.append(class_idx)
return image_paths, labels, class_names
Train/Validation/Test Split
Data is split using stratified sampling to maintain class distribution:
Configuration
Implementation
Validation
Default Split Ratios :
Train: 70%
Validation: 15%
Test: 15%
Options :
Stratified sampling (maintains class balance)
Random seed for reproducibility (default: 72)
Custom split ratios via sliders
Location : app/training/dataset.py:63-96def create_splits (
image_paths : list[Path],
labels : list[ int ],
train_ratio : float = 0.7 ,
val_ratio : float = 0.15 ,
test_ratio : float = 0.15 ,
stratified : bool = True ,
random_seed : int = 72 ,
) -> dict :
# First split: train vs (val+test)
train_paths, temp_paths, train_labels, temp_labels = train_test_split(
image_paths, labels,
test_size = (val_ratio + test_ratio),
random_state = random_seed,
stratify = labels if stratified else None
)
# Second split: val vs test
val_test_ratio = test_ratio / (val_ratio + test_ratio)
val_paths, test_paths, val_labels, test_labels = train_test_split(
temp_paths, temp_labels,
test_size = val_test_ratio,
random_state = random_seed,
stratify = temp_labels if stratified else None
)
return {
"train" : { "paths" : train_paths, "labels" : train_labels},
"val" : { "paths" : val_paths, "labels" : val_labels},
"test" : { "paths" : test_paths, "labels" : test_labels}
}
Split Validation Checks :
Total percentages sum to 100%
Minimum 50% for training set
Each split has at least 1 sample per class
Stratification maintains class distribution
Data Augmentation
Augmentation strategies to increase dataset diversity and reduce overfitting:
Augmentation Presets :
Light
Horizontal flip (50%)
Rotation (±10°)
Brightness/contrast (±10%)
Medium
Horizontal flip (50%)
Rotation (±15°)
Brightness/contrast (±20%)
Scale (90-110%)
Heavy
Horizontal & vertical flip
Rotation (±30°)
Brightness/contrast (±30%)
Scale (80-120%)
Gaussian blur
Custom
Fully configurable
Mix any transformations
Adjust probabilities
Implementation : app/training/transforms.py
def create_train_transforms ( dataset_config : dict ):
"""Create training transforms with augmentation."""
transforms_list = [
transforms.Resize(( 224 , 224 )),
transforms.ToTensor(),
]
# Apply augmentation based on preset
aug_preset = dataset_config.get( "augmentation" , {}).get( "preset" , "None" )
if aug_preset == "Light" :
transforms_list.insert( 1 , transforms.RandomHorizontalFlip( 0.5 ))
transforms_list.insert( 1 , transforms.RandomRotation( 10 ))
transforms_list.insert( 1 , transforms.ColorJitter(
brightness = 0.1 , contrast = 0.1
))
elif aug_preset == "Medium" :
transforms_list.insert( 1 , transforms.RandomHorizontalFlip( 0.5 ))
transforms_list.insert( 1 , transforms.RandomRotation( 15 ))
transforms_list.insert( 1 , transforms.ColorJitter(
brightness = 0.2 , contrast = 0.2
))
transforms_list.insert( 1 , transforms.RandomResizedCrop(
224 , scale = ( 0.9 , 1.1 )
))
# ... Heavy and Custom presets
# Normalize to ImageNet stats
transforms_list.append(transforms.Normalize(
mean = [ 0.485 , 0.456 , 0.406 ],
std = [ 0.229 , 0.224 , 0.225 ]
))
return transforms.Compose(transforms_list)
Class Imbalance Handling
The platform provides multiple strategies for handling imbalanced datasets:
Weighted Sampling
Class Weights
Focal Loss
Weighted Random Sampler creates balanced batches by oversampling minority classes:def create_weighted_sampler (
labels : list[ int ],
num_classes : int
) -> WeightedRandomSampler:
# Compute inverse frequency weights
class_weights = compute_class_weights(labels, num_classes)
# Assign weight to each sample
sample_weights = [class_weights[label].item() for label in labels]
return WeightedRandomSampler(
weights = sample_weights,
num_samples = len (labels),
replacement = True
)
Location : app/training/dataset.py:113-126Auto Class Weights in loss function:def compute_class_weights (
labels : list[ int ],
num_classes : int
) -> torch.Tensor:
counter = Counter(labels)
total = len (labels)
weights = []
for i in range (num_classes):
count = counter.get(i, 1 )
weight = total / (num_classes * count)
weights.append(weight)
return torch.tensor(weights, dtype = torch.float32)
Applied to CrossEntropyLoss: criterion = nn.CrossEntropyLoss( weight = class_weights)
Location : app/training/dataset.py:99-110Focal Loss focuses learning on hard examples:class FocalLoss ( nn . Module ):
def __init__ ( self , alpha = None , gamma = 2.0 ):
super (). __init__ ()
self .alpha = alpha # Class weights
self .gamma = gamma # Focusing parameter
def forward ( self , inputs , targets ):
ce_loss = F.cross_entropy(
inputs, targets,
weight = self .alpha,
reduction = 'none'
)
pt = torch.exp( - ce_loss)
focal_loss = (( 1 - pt) ** self .gamma) * ce_loss
return focal_loss.mean()
Location : app/training/optimizers.py
Stage 2: Model Architecture
Model Selection
The platform supports three model families:
Custom CNN Build CNNs from scratch with configurable layer stacks
Transfer Learning Fine-tune pre-trained models (VGG, ResNet, EfficientNet)
Vision Transformer Transformer architecture with patch embeddings
Custom CNN Builder
Location : app/models/pytorch/cnn_builder.py:72-237
Build CNNs from a layer stack configuration:
class CustomCNN ( nn . Module ):
ACTIVATION_MAP = {
"relu" : nn.ReLU( inplace = True ),
"leaky_relu" : nn.LeakyReLU( 0.1 , inplace = True ),
"gelu" : nn.GELU(),
"swish" : nn.SiLU( inplace = True ),
}
def __init__ (
self ,
layers : list[dict[ str , Any]],
num_classes : int ,
input_channels : int = 3 ,
input_size : int = 224 ,
):
super (). __init__ ()
self .feature_layers = nn.ModuleList()
self .classifier_layers = nn.ModuleList()
current_channels = input_channels
current_spatial = input_size
for layer_config in layers:
layer_type = layer_config[ "type" ]
params = layer_config.get( "params" , {})
if layer_type == "Conv2D" :
layer, current_channels = self ._build_conv2d(
current_channels, params
)
self .feature_layers.append(layer)
elif layer_type == "MaxPooling2D" :
pool_size = params.get( "pool_size" , 2 )
layer = nn.MaxPool2d( kernel_size = pool_size, stride = pool_size)
current_spatial = current_spatial // pool_size
self .feature_layers.append(layer)
elif layer_type == "BatchNorm" :
layer = nn.BatchNorm2d(current_channels)
self .feature_layers.append(layer)
# ... More layer types
Supported Layer Types :
Conv2D (with configurable filters, kernel, activation)
MaxPooling2D / AveragePooling2D
BatchNorm
Dropout / Dropout2D
Flatten / GlobalAvgPool
Dense (fully connected)
Transfer Learning
Location : app/models/pytorch/transfer.py:81-252
Supported Base Models :
VGG16, VGG19
ResNet50, ResNet101
InceptionV3
EfficientNetB0
Fine-tuning Strategies :
Location : app/models/pytorch/transformer.py:223-362
Architecture Components :
class VisionTransformer ( nn . Module ):
def __init__ (
self ,
image_size : int = 224 ,
patch_size : int = 16 , # 16x16 patches
num_classes : int = 1000 ,
embed_dim : int = 768 , # Embedding dimension
depth : int = 12 , # Number of transformer blocks
num_heads : int = 12 , # Attention heads
mlp_ratio : float = 4.0 , # MLP hidden dim ratio
dropout : float = 0.1 ,
):
super (). __init__ ()
# 1. Patch embedding (image to sequences)
self .patch_embed = PatchEmbedding( ... )
# 2. CLS token and position embeddings
self .cls_token = nn.Parameter(torch.zeros( 1 , 1 , embed_dim))
self .pos_embed = nn.Parameter(
torch.zeros( 1 , num_patches + 1 , embed_dim)
)
# 3. Transformer blocks
self .blocks = nn.ModuleList([
TransformerBlock(
embed_dim = embed_dim,
num_heads = num_heads,
mlp_ratio = mlp_ratio,
dropout = dropout
) for _ in range (depth)
])
# 4. Classification head
self .norm = nn.LayerNorm(embed_dim)
self .head = nn.Linear(embed_dim, num_classes)
Forward Pass :
Split image into patches (14x14 = 196 patches for 224x224 image with 16x16 patches)
Linearly embed each patch
Add learnable position embeddings
Prepend CLS token
Pass through transformer blocks
Extract CLS token output
Classify
Stage 3: Training Configuration
Hyperparameters
Optimizer Settings
Optimizer : Adam, SGD, RMSprop, AdamW
Learning Rate : 0.0001 - 0.1
Weight Decay : 0 - 0.01
Momentum : 0 - 0.99 (SGD only)
Training Settings
Epochs : 10 - 500
Batch Size : 16, 32, 64, 128
Early Stopping : Optional (patience: 5-50)
Checkpointing : Every N epochs
Learning Rate Scheduling
Available Schedulers :
Step LR
Exponential LR
Cosine Annealing
Reduce on Plateau
Reduce LR by gamma every step_size epochs: scheduler = torch.optim.lr_scheduler.StepLR(
optimizer,
step_size = 30 , # Decay every 30 epochs
gamma = 0.1 # Multiply LR by 0.1
)
Exponential decay: scheduler = torch.optim.lr_scheduler.ExponentialLR(
optimizer,
gamma = 0.95 # LR *= 0.95 each epoch
)
Cosine decay with warm restarts: scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
optimizer,
T_0 = 10 , # Restart every 10 epochs
T_mult = 2 # Double restart period each time
)
Adaptive reduction when validation loss plateaus: scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer,
mode = 'min' ,
factor = 0.1 , # Reduce by 10x
patience = 5 , # Wait 5 epochs
min_lr = 1e-6
)
Implementation : app/training/optimizers.py
Stage 4: Training Execution
Training Engine
Location : app/training/engine.py:13-306
The TrainingEngine manages the complete training loop:
class TrainingEngine :
def __init__ (
self ,
model : nn.Module,
train_loader : DataLoader,
val_loader : DataLoader,
optimizer : torch.optim.Optimizer,
criterion : nn.Module,
device : torch.device,
scheduler : torch.optim.lr_scheduler.LRScheduler | None = None ,
early_stopping_patience : int = 0 ,
checkpoint_callback : Callable | None = None ,
batch_callback : Callable | None = None ,
):
self .model = model
self .train_loader = train_loader
self .val_loader = val_loader
self .optimizer = optimizer
self .criterion = criterion
self .device = device
self .scheduler = scheduler
# Training state
self .current_epoch = 0
self .best_val_loss = float ( 'inf' )
self .should_stop = False
self .is_paused = False
# Metrics history
self .history = {
"train_loss" : [],
"train_acc" : [],
"train_precision" : [],
"train_recall" : [],
"train_f1" : [],
"val_loss" : [],
"val_acc" : [],
"val_precision" : [],
"val_recall" : [],
"val_f1" : [],
"lr" : [],
}
Training Loop
Single Epoch Training (app/training/engine.py:63-130):
def train_epoch ( self ) -> dict :
self .model.train()
running_loss = 0.0
correct = 0
total = 0
all_preds = []
all_targets = []
for batch_idx, (inputs, targets) in enumerate ( self .train_loader):
inputs, targets = inputs.to( self .device), targets.to( self .device)
# Forward pass
self .optimizer.zero_grad()
outputs = self .model(inputs)
loss = self .criterion(outputs, targets)
# Backward pass
loss.backward()
self .optimizer.step()
# Statistics
running_loss += loss.item() * inputs.size( 0 )
_, predicted = outputs.max( 1 )
total += targets.size( 0 )
correct += predicted.eq(targets).sum().item()
all_preds.extend(predicted.cpu().numpy())
all_targets.extend(targets.cpu().numpy())
# Batch callback (every 10 batches)
if self .batch_callback and (batch_idx + 1 ) % 10 == 0 :
self .batch_callback(batch_idx + 1 , len ( self .train_loader), { ... })
# Compute metrics
avg_loss = running_loss / total
accuracy = correct / total
precision = precision_score(all_targets, all_preds, average = 'macro' )
recall = recall_score(all_targets, all_preds, average = 'macro' )
f1 = f1_score(all_targets, all_preds, average = 'macro' )
return {
"train_loss" : avg_loss,
"train_acc" : accuracy,
"train_precision" : precision,
"train_recall" : recall,
"train_f1" : f1
}
Validation
Validation Loop (app/training/engine.py:132-177):
@torch.no_grad ()
def validate ( self ) -> dict :
self .model.eval()
running_loss = 0.0
correct = 0
total = 0
all_preds = []
all_targets = []
for inputs, targets in self .val_loader:
inputs, targets = inputs.to( self .device), targets.to( self .device)
outputs = self .model(inputs)
loss = self .criterion(outputs, targets)
running_loss += loss.item() * inputs.size( 0 )
_, predicted = outputs.max( 1 )
total += targets.size( 0 )
correct += predicted.eq(targets).sum().item()
all_preds.extend(predicted.cpu().numpy())
all_targets.extend(targets.cpu().numpy())
# Compute metrics
avg_loss = running_loss / total
accuracy = correct / total
precision = precision_score(all_targets, all_preds, average = 'macro' )
recall = recall_score(all_targets, all_preds, average = 'macro' )
f1 = f1_score(all_targets, all_preds, average = 'macro' )
return {
"val_loss" : avg_loss,
"val_acc" : accuracy,
"val_precision" : precision,
"val_recall" : recall,
"val_f1" : f1
}
Checkpointing
Automatic model checkpointing with best model tracking:
def checkpoint_callback ( epoch : int , metrics : dict , is_best : bool ):
if training_config.get( "checkpointing" , True ):
checkpoint_manager.save_checkpoint(
session_id = experiment_id,
model = model,
optimizer = optimizer,
epoch = epoch,
loss = metrics.get( "val_loss" , 0 ),
metrics = metrics,
model_config = model_config,
scheduler = scheduler,
is_best = is_best
)
Checkpoint Structure :
checkpoint = {
'epoch' : epoch,
'model_state_dict' : model.state_dict(),
'optimizer_state_dict' : optimizer.state_dict(),
'scheduler_state_dict' : scheduler.state_dict() if scheduler else None ,
'loss' : loss,
'metrics' : metrics,
'model_config' : model_config,
}
Stage 5: Monitoring
Real-Time Metrics
Live training monitoring with auto-refresh:
@st.fragment ( run_every = "1s" )
def live_training_monitor ():
if not is_training_active():
st.info( "No active training session" )
return
# Get latest metrics from file
results = get_results()
# Display live metrics with delta
col1, col2, col3 = st.columns( 3 )
col1.metric(
"Epoch" ,
results.get( "epoch" , 0 ),
delta = f " { results.get( 'epoch' , 0 ) - results.get( 'prev_epoch' , 0 ) } "
)
col2.metric(
"Loss" ,
f " { results.get( 'loss' , 0 ) :.4f} " ,
delta = f " { results.get( 'loss' , 0 ) - results.get( 'prev_loss' , 0 ) :.4f} " ,
delta_color = "inverse"
)
col3.metric(
"Accuracy" ,
f " { results.get( 'accuracy' , 0 ) :.2%} " ,
delta = f " { (results.get( 'accuracy' , 0 ) - results.get( 'prev_accuracy' , 0 )) :.2%} "
)
Training Curves
Interactive training history visualization:
import plotly.graph_objects as go
fig = go.Figure()
# Training loss
fig.add_trace(go.Scatter(
y = history[ 'train_loss' ],
name = 'Train Loss' ,
mode = 'lines' ,
line = dict ( color = '#98c127' )
))
# Validation loss
fig.add_trace(go.Scatter(
y = history[ 'val_loss' ],
name = 'Val Loss' ,
mode = 'lines' ,
line = dict ( color = '#ff8ca1' )
))
fig.update_layout(
title = "Training History" ,
xaxis_title = "Epoch" ,
yaxis_title = "Loss" ,
hovermode = 'x unified'
)
st.plotly_chart(fig, use_container_width = True )
Stage 6: Evaluation & Results
Computed Metrics (per epoch):
Accuracy : Overall classification accuracy
Precision : True positives / (True positives + False positives)
Recall : True positives / (True positives + False negatives)
F1 Score : Harmonic mean of precision and recall
Loss : Cross-entropy or focal loss
Macro Averaging : Metrics computed per class and averaged (equal weight per class)
Confusion Matrix
Visualize classification performance per class:
from sklearn.metrics import confusion_matrix
import plotly.express as px
cm = confusion_matrix(y_true, y_pred)
fig = px.imshow(
cm,
text_auto = True ,
labels = dict ( x = "Predicted" , y = "Actual" ),
x = class_names,
y = class_names,
color_continuous_scale = "Blues"
)
fig.update_layout( title = "Confusion Matrix" )
st.plotly_chart(fig, use_container_width = True )
ROC Curves
One-vs-rest ROC curves for multi-class classification:
from sklearn.metrics import roc_curve, auc
import plotly.graph_objects as go
fig = go.Figure()
for i, class_name in enumerate (class_names):
fpr, tpr, _ = roc_curve(
(y_true == i).astype( int ),
y_probs[:, i]
)
roc_auc = auc(fpr, tpr)
fig.add_trace(go.Scatter(
x = fpr, y = tpr,
name = f ' { class_name } (AUC= { roc_auc :.3f} )' ,
mode = 'lines'
))
# Random classifier baseline
fig.add_trace(go.Scatter(
x = [ 0 , 1 ], y = [ 0 , 1 ],
name = 'Random' ,
mode = 'lines' ,
line = dict ( dash = 'dash' , color = 'gray' )
))
fig.update_layout(
title = "ROC Curves (One-vs-Rest)" ,
xaxis_title = "False Positive Rate" ,
yaxis_title = "True Positive Rate"
)
Stage 7: Interpretability
Grad-CAM Visualization
Visualize which regions of the image the model focuses on:
class GradCAM :
def __init__ ( self , model , target_layer ):
self .model = model
self .target_layer = target_layer
self .gradients = None
self .activations = None
# Register hooks
target_layer.register_forward_hook( self .save_activation)
target_layer.register_backward_hook( self .save_gradient)
def save_activation ( self , module , input , output ):
self .activations = output
def save_gradient ( self , module , grad_input , grad_output ):
self .gradients = grad_output[ 0 ]
def generate_cam ( self , input_image , target_class ):
# Forward pass
output = self .model(input_image)
# Backward pass for target class
self .model.zero_grad()
output[ 0 , target_class].backward()
# Compute weighted activation map
weights = torch.mean( self .gradients, dim = [ 2 , 3 ])
cam = torch.zeros( self .activations.shape[ 2 :], dtype = torch.float32)
for i, w in enumerate (weights[ 0 ]):
cam += w * self .activations[ 0 , i]
# ReLU and normalize
cam = torch.clamp(cam, min = 0 )
cam = (cam - cam.min()) / (cam.max() - cam.min())
return cam
t-SNE Embeddings
Visualize high-dimensional feature space in 2D:
from sklearn.manifold import TSNE
import plotly.express as px
# Extract features from model
features = []
labels = []
with torch.no_grad():
for images, targets in dataloader:
feats = model.get_feature_extractor()(images.to(device))
features.append(feats.cpu().numpy())
labels.append(targets.numpy())
features = np.vstack(features)
labels = np.concatenate(labels)
# Compute t-SNE
tsne = TSNE( n_components = 2 , perplexity = 30 , random_state = 42 )
embeddings = tsne.fit_transform(features)
# Plot
df = pd.DataFrame({
'x' : embeddings[:, 0 ],
'y' : embeddings[:, 1 ],
'family' : [class_names[l] for l in labels]
})
fig = px.scatter(
df, x = 'x' , y = 'y' , color = 'family' ,
title = 't-SNE Feature Space Visualization'
)
st.plotly_chart(fig, use_container_width = True )
Workflow State Management
State Persistence
All workflow state is persisted to disk for resumability:
# Save workflow state
from state.workflow import save_dataset_config, save_model_config
save_dataset_config({
"dataset_path" : "repo/malware" ,
"selected_families" : [ "Adialer.C" , "Agent.FYI" ],
"split" : { "train" : 70 , "val" : 15 , "test" : 15 },
"augmentation" : { "preset" : "Medium" }
})
save_model_config({
"model_type" : "Transfer Learning" ,
"num_classes" : 9 ,
"transfer_config" : {
"base_model" : "ResNet50" ,
"strategy" : "Partial Fine-tuning" ,
"unfreeze_layers" : 3
}
})
File Structure : app/state/persistence.py
.streamlit_sessions/
└── {session_id}/
├── session.json # Session metadata
├── dataset.json # Dataset configuration
├── models.json # Model configurations
├── training.json # Training configurations
└── experiments.json # Experiment results & history
Best Practices
Experiment Design
Start with a baseline model
Use stratified splits
Set reproducible random seeds
Document hyperparameters
Training
Monitor validation metrics
Use early stopping
Save best model checkpoints
Track learning curves
Data
Balance classes or use weighted loss
Apply appropriate augmentation
Validate data quality
Check for data leakage
Evaluation
Use multiple metrics
Analyze confusion matrix
Check per-class performance
Visualize mistakes
References
Training engine: app/training/engine.py
Dataset pipeline: app/training/dataset.py
Model builders: app/models/pytorch/
State management: app/state/workflow.py