Skip to main content

Overview

The UC Intel Final platform implements a comprehensive ML workflow that guides users through all stages of a malware classification experiment, from dataset configuration to model evaluation and interpretability.

Data Preparation

Dataset selection, splitting, and augmentation

Model Building

Architecture selection and configuration

Training

Hyperparameter tuning and model training

Monitoring

Real-time training progress tracking

Evaluation

Performance metrics and analysis

Interpretation

Model explainability and visualization

Workflow Stages

The workflow is iterative - you can return to earlier stages to refine your experiment based on results and interpretability analysis.

Stage 1: Dataset Configuration

Dataset Selection

The platform automatically scans the repo/malware/ directory for malware family folders:
repo/malware/
├── Adialer.C/       # Family 1
│   ├── 00001.png
│   ├── 00002.png
│   └── ...
├── Agent.FYI/       # Family 2
├── Allaple.A/       # Family 3
└── ...
Implementation: app/utils/dataset_utils.py and app/training/dataset.py:37-60
def scan_dataset(dataset_path: Path, 
                selected_families: list[str] | None = None
                ) -> tuple[list[Path], list[int], list[str]]:
    """Scan dataset directory and return image paths, labels, class names."""
    image_paths = []
    labels = []
    class_names = []
    
    family_dirs = sorted([d for d in dataset_path.iterdir() if d.is_dir()])
    
    # Filter if selected_families specified
    if selected_families:
        family_dirs = [d for d in family_dirs if d.name in selected_families]
    
    for class_idx, family_dir in enumerate(family_dirs):
        class_names.append(family_dir.name)
        for img_file in family_dir.iterdir():
            if img_file.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp']:
                image_paths.append(img_file)
                labels.append(class_idx)
    
    return image_paths, labels, class_names

Train/Validation/Test Split

Data is split using stratified sampling to maintain class distribution:
Default Split Ratios:
  • Train: 70%
  • Validation: 15%
  • Test: 15%
Options:
  • Stratified sampling (maintains class balance)
  • Random seed for reproducibility (default: 72)
  • Custom split ratios via sliders

Data Augmentation

Augmentation strategies to increase dataset diversity and reduce overfitting: Augmentation Presets:

Light

  • Horizontal flip (50%)
  • Rotation (±10°)
  • Brightness/contrast (±10%)

Medium

  • Horizontal flip (50%)
  • Rotation (±15°)
  • Brightness/contrast (±20%)
  • Scale (90-110%)

Heavy

  • Horizontal & vertical flip
  • Rotation (±30°)
  • Brightness/contrast (±30%)
  • Scale (80-120%)
  • Gaussian blur

Custom

  • Fully configurable
  • Mix any transformations
  • Adjust probabilities
Implementation: app/training/transforms.py
def create_train_transforms(dataset_config: dict):
    """Create training transforms with augmentation."""
    transforms_list = [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ]
    
    # Apply augmentation based on preset
    aug_preset = dataset_config.get("augmentation", {}).get("preset", "None")
    
    if aug_preset == "Light":
        transforms_list.insert(1, transforms.RandomHorizontalFlip(0.5))
        transforms_list.insert(1, transforms.RandomRotation(10))
        transforms_list.insert(1, transforms.ColorJitter(
            brightness=0.1, contrast=0.1
        ))
    elif aug_preset == "Medium":
        transforms_list.insert(1, transforms.RandomHorizontalFlip(0.5))
        transforms_list.insert(1, transforms.RandomRotation(15))
        transforms_list.insert(1, transforms.ColorJitter(
            brightness=0.2, contrast=0.2
        ))
        transforms_list.insert(1, transforms.RandomResizedCrop(
            224, scale=(0.9, 1.1)
        ))
    # ... Heavy and Custom presets
    
    # Normalize to ImageNet stats
    transforms_list.append(transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ))
    
    return transforms.Compose(transforms_list)

Class Imbalance Handling

The platform provides multiple strategies for handling imbalanced datasets:
Weighted Random Sampler creates balanced batches by oversampling minority classes:
def create_weighted_sampler(
    labels: list[int], 
    num_classes: int
) -> WeightedRandomSampler:
    # Compute inverse frequency weights
    class_weights = compute_class_weights(labels, num_classes)
    
    # Assign weight to each sample
    sample_weights = [class_weights[label].item() for label in labels]
    
    return WeightedRandomSampler(
        weights=sample_weights,
        num_samples=len(labels),
        replacement=True
    )
Location: app/training/dataset.py:113-126

Stage 2: Model Architecture

Model Selection

The platform supports three model families:

Custom CNN

Build CNNs from scratch with configurable layer stacks

Transfer Learning

Fine-tune pre-trained models (VGG, ResNet, EfficientNet)

Vision Transformer

Transformer architecture with patch embeddings

Custom CNN Builder

Location: app/models/pytorch/cnn_builder.py:72-237 Build CNNs from a layer stack configuration:
class CustomCNN(nn.Module):
    ACTIVATION_MAP = {
        "relu": nn.ReLU(inplace=True),
        "leaky_relu": nn.LeakyReLU(0.1, inplace=True),
        "gelu": nn.GELU(),
        "swish": nn.SiLU(inplace=True),
    }
    
    def __init__(
        self,
        layers: list[dict[str, Any]],
        num_classes: int,
        input_channels: int = 3,
        input_size: int = 224,
    ):
        super().__init__()
        
        self.feature_layers = nn.ModuleList()
        self.classifier_layers = nn.ModuleList()
        
        current_channels = input_channels
        current_spatial = input_size
        
        for layer_config in layers:
            layer_type = layer_config["type"]
            params = layer_config.get("params", {})
            
            if layer_type == "Conv2D":
                layer, current_channels = self._build_conv2d(
                    current_channels, params
                )
                self.feature_layers.append(layer)
            
            elif layer_type == "MaxPooling2D":
                pool_size = params.get("pool_size", 2)
                layer = nn.MaxPool2d(kernel_size=pool_size, stride=pool_size)
                current_spatial = current_spatial // pool_size
                self.feature_layers.append(layer)
            
            elif layer_type == "BatchNorm":
                layer = nn.BatchNorm2d(current_channels)
                self.feature_layers.append(layer)
            
            # ... More layer types
Supported Layer Types:
  • Conv2D (with configurable filters, kernel, activation)
  • MaxPooling2D / AveragePooling2D
  • BatchNorm
  • Dropout / Dropout2D
  • Flatten / GlobalAvgPool
  • Dense (fully connected)

Transfer Learning

Location: app/models/pytorch/transfer.py:81-252 Supported Base Models:
  • VGG16, VGG19
  • ResNet50, ResNet101
  • InceptionV3
  • EfficientNetB0
Fine-tuning Strategies:
Freeze all base model layers, train only the classifier head:
# Freeze all base model parameters
for param in self.base_model.parameters():
    param.requires_grad = False
Use when: You have a small dataset and want to avoid overfitting

Vision Transformer

Location: app/models/pytorch/transformer.py:223-362 Architecture Components:
class VisionTransformer(nn.Module):
    def __init__(
        self,
        image_size: int = 224,
        patch_size: int = 16,         # 16x16 patches
        num_classes: int = 1000,
        embed_dim: int = 768,         # Embedding dimension
        depth: int = 12,              # Number of transformer blocks
        num_heads: int = 12,          # Attention heads
        mlp_ratio: float = 4.0,       # MLP hidden dim ratio
        dropout: float = 0.1,
    ):
        super().__init__()
        
        # 1. Patch embedding (image to sequences)
        self.patch_embed = PatchEmbedding(...)
        
        # 2. CLS token and position embeddings
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        self.pos_embed = nn.Parameter(
            torch.zeros(1, num_patches + 1, embed_dim)
        )
        
        # 3. Transformer blocks
        self.blocks = nn.ModuleList([
            TransformerBlock(
                embed_dim=embed_dim,
                num_heads=num_heads,
                mlp_ratio=mlp_ratio,
                dropout=dropout
            ) for _ in range(depth)
        ])
        
        # 4. Classification head
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, num_classes)
Forward Pass:
  1. Split image into patches (14x14 = 196 patches for 224x224 image with 16x16 patches)
  2. Linearly embed each patch
  3. Add learnable position embeddings
  4. Prepend CLS token
  5. Pass through transformer blocks
  6. Extract CLS token output
  7. Classify

Stage 3: Training Configuration

Hyperparameters

Optimizer Settings

  • Optimizer: Adam, SGD, RMSprop, AdamW
  • Learning Rate: 0.0001 - 0.1
  • Weight Decay: 0 - 0.01
  • Momentum: 0 - 0.99 (SGD only)

Training Settings

  • Epochs: 10 - 500
  • Batch Size: 16, 32, 64, 128
  • Early Stopping: Optional (patience: 5-50)
  • Checkpointing: Every N epochs

Learning Rate Scheduling

Available Schedulers:
Reduce LR by gamma every step_size epochs:
scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=30,      # Decay every 30 epochs
    gamma=0.1          # Multiply LR by 0.1
)
Implementation: app/training/optimizers.py

Stage 4: Training Execution

Training Engine

Location: app/training/engine.py:13-306 The TrainingEngine manages the complete training loop:
class TrainingEngine:
    def __init__(
        self,
        model: nn.Module,
        train_loader: DataLoader,
        val_loader: DataLoader,
        optimizer: torch.optim.Optimizer,
        criterion: nn.Module,
        device: torch.device,
        scheduler: torch.optim.lr_scheduler.LRScheduler | None = None,
        early_stopping_patience: int = 0,
        checkpoint_callback: Callable | None = None,
        batch_callback: Callable | None = None,
    ):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optimizer
        self.criterion = criterion
        self.device = device
        self.scheduler = scheduler
        
        # Training state
        self.current_epoch = 0
        self.best_val_loss = float('inf')
        self.should_stop = False
        self.is_paused = False
        
        # Metrics history
        self.history = {
            "train_loss": [],
            "train_acc": [],
            "train_precision": [],
            "train_recall": [],
            "train_f1": [],
            "val_loss": [],
            "val_acc": [],
            "val_precision": [],
            "val_recall": [],
            "val_f1": [],
            "lr": [],
        }

Training Loop

Single Epoch Training (app/training/engine.py:63-130):
def train_epoch(self) -> dict:
    self.model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    all_preds = []
    all_targets = []
    
    for batch_idx, (inputs, targets) in enumerate(self.train_loader):
        inputs, targets = inputs.to(self.device), targets.to(self.device)
        
        # Forward pass
        self.optimizer.zero_grad()
        outputs = self.model(inputs)
        loss = self.criterion(outputs, targets)
        
        # Backward pass
        loss.backward()
        self.optimizer.step()
        
        # Statistics
        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
        all_preds.extend(predicted.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())
        
        # Batch callback (every 10 batches)
        if self.batch_callback and (batch_idx + 1) % 10 == 0:
            self.batch_callback(batch_idx + 1, len(self.train_loader), {...})
    
    # Compute metrics
    avg_loss = running_loss / total
    accuracy = correct / total
    precision = precision_score(all_targets, all_preds, average='macro')
    recall = recall_score(all_targets, all_preds, average='macro')
    f1 = f1_score(all_targets, all_preds, average='macro')
    
    return {
        "train_loss": avg_loss,
        "train_acc": accuracy,
        "train_precision": precision,
        "train_recall": recall,
        "train_f1": f1
    }

Validation

Validation Loop (app/training/engine.py:132-177):
@torch.no_grad()
def validate(self) -> dict:
    self.model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    all_preds = []
    all_targets = []
    
    for inputs, targets in self.val_loader:
        inputs, targets = inputs.to(self.device), targets.to(self.device)
        
        outputs = self.model(inputs)
        loss = self.criterion(outputs, targets)
        
        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
        all_preds.extend(predicted.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())
    
    # Compute metrics
    avg_loss = running_loss / total
    accuracy = correct / total
    precision = precision_score(all_targets, all_preds, average='macro')
    recall = recall_score(all_targets, all_preds, average='macro')
    f1 = f1_score(all_targets, all_preds, average='macro')
    
    return {
        "val_loss": avg_loss,
        "val_acc": accuracy,
        "val_precision": precision,
        "val_recall": recall,
        "val_f1": f1
    }

Checkpointing

Automatic model checkpointing with best model tracking:
def checkpoint_callback(epoch: int, metrics: dict, is_best: bool):
    if training_config.get("checkpointing", True):
        checkpoint_manager.save_checkpoint(
            session_id=experiment_id,
            model=model,
            optimizer=optimizer,
            epoch=epoch,
            loss=metrics.get("val_loss", 0),
            metrics=metrics,
            model_config=model_config,
            scheduler=scheduler,
            is_best=is_best
        )
Checkpoint Structure:
checkpoint = {
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
    'loss': loss,
    'metrics': metrics,
    'model_config': model_config,
}

Stage 5: Monitoring

Real-Time Metrics

Live training monitoring with auto-refresh:
@st.fragment(run_every="1s")
def live_training_monitor():
    if not is_training_active():
        st.info("No active training session")
        return
    
    # Get latest metrics from file
    results = get_results()
    
    # Display live metrics with delta
    col1, col2, col3 = st.columns(3)
    col1.metric(
        "Epoch", 
        results.get("epoch", 0),
        delta=f"{results.get('epoch', 0) - results.get('prev_epoch', 0)}"
    )
    col2.metric(
        "Loss", 
        f"{results.get('loss', 0):.4f}",
        delta=f"{results.get('loss', 0) - results.get('prev_loss', 0):.4f}",
        delta_color="inverse"
    )
    col3.metric(
        "Accuracy", 
        f"{results.get('accuracy', 0):.2%}",
        delta=f"{(results.get('accuracy', 0) - results.get('prev_accuracy', 0)):.2%}"
    )

Training Curves

Interactive training history visualization:
import plotly.graph_objects as go

fig = go.Figure()

# Training loss
fig.add_trace(go.Scatter(
    y=history['train_loss'],
    name='Train Loss',
    mode='lines',
    line=dict(color='#98c127')
))

# Validation loss
fig.add_trace(go.Scatter(
    y=history['val_loss'],
    name='Val Loss',
    mode='lines',
    line=dict(color='#ff8ca1')
))

fig.update_layout(
    title="Training History",
    xaxis_title="Epoch",
    yaxis_title="Loss",
    hovermode='x unified'
)

st.plotly_chart(fig, use_container_width=True)

Stage 6: Evaluation & Results

Performance Metrics

Computed Metrics (per epoch):
  • Accuracy: Overall classification accuracy
  • Precision: True positives / (True positives + False positives)
  • Recall: True positives / (True positives + False negatives)
  • F1 Score: Harmonic mean of precision and recall
  • Loss: Cross-entropy or focal loss
Macro Averaging: Metrics computed per class and averaged (equal weight per class)

Confusion Matrix

Visualize classification performance per class:
from sklearn.metrics import confusion_matrix
import plotly.express as px

cm = confusion_matrix(y_true, y_pred)

fig = px.imshow(
    cm,
    text_auto=True,
    labels=dict(x="Predicted", y="Actual"),
    x=class_names,
    y=class_names,
    color_continuous_scale="Blues"
)

fig.update_layout(title="Confusion Matrix")
st.plotly_chart(fig, use_container_width=True)

ROC Curves

One-vs-rest ROC curves for multi-class classification:
from sklearn.metrics import roc_curve, auc
import plotly.graph_objects as go

fig = go.Figure()

for i, class_name in enumerate(class_names):
    fpr, tpr, _ = roc_curve(
        (y_true == i).astype(int), 
        y_probs[:, i]
    )
    roc_auc = auc(fpr, tpr)
    
    fig.add_trace(go.Scatter(
        x=fpr, y=tpr,
        name=f'{class_name} (AUC={roc_auc:.3f})',
        mode='lines'
    ))

# Random classifier baseline
fig.add_trace(go.Scatter(
    x=[0, 1], y=[0, 1],
    name='Random',
    mode='lines',
    line=dict(dash='dash', color='gray')
))

fig.update_layout(
    title="ROC Curves (One-vs-Rest)",
    xaxis_title="False Positive Rate",
    yaxis_title="True Positive Rate"
)

Stage 7: Interpretability

Grad-CAM Visualization

Visualize which regions of the image the model focuses on:
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None
        
        # Register hooks
        target_layer.register_forward_hook(self.save_activation)
        target_layer.register_backward_hook(self.save_gradient)
    
    def save_activation(self, module, input, output):
        self.activations = output
    
    def save_gradient(self, module, grad_input, grad_output):
        self.gradients = grad_output[0]
    
    def generate_cam(self, input_image, target_class):
        # Forward pass
        output = self.model(input_image)
        
        # Backward pass for target class
        self.model.zero_grad()
        output[0, target_class].backward()
        
        # Compute weighted activation map
        weights = torch.mean(self.gradients, dim=[2, 3])
        cam = torch.zeros(self.activations.shape[2:], dtype=torch.float32)
        
        for i, w in enumerate(weights[0]):
            cam += w * self.activations[0, i]
        
        # ReLU and normalize
        cam = torch.clamp(cam, min=0)
        cam = (cam - cam.min()) / (cam.max() - cam.min())
        
        return cam

t-SNE Embeddings

Visualize high-dimensional feature space in 2D:
from sklearn.manifold import TSNE
import plotly.express as px

# Extract features from model
features = []
labels = []

with torch.no_grad():
    for images, targets in dataloader:
        feats = model.get_feature_extractor()(images.to(device))
        features.append(feats.cpu().numpy())
        labels.append(targets.numpy())

features = np.vstack(features)
labels = np.concatenate(labels)

# Compute t-SNE
tsne = TSNE(n_components=2, perplexity=30, random_state=42)
embeddings = tsne.fit_transform(features)

# Plot
df = pd.DataFrame({
    'x': embeddings[:, 0],
    'y': embeddings[:, 1],
    'family': [class_names[l] for l in labels]
})

fig = px.scatter(
    df, x='x', y='y', color='family',
    title='t-SNE Feature Space Visualization'
)
st.plotly_chart(fig, use_container_width=True)

Workflow State Management

State Persistence

All workflow state is persisted to disk for resumability:
# Save workflow state
from state.workflow import save_dataset_config, save_model_config

save_dataset_config({
    "dataset_path": "repo/malware",
    "selected_families": ["Adialer.C", "Agent.FYI"],
    "split": {"train": 70, "val": 15, "test": 15},
    "augmentation": {"preset": "Medium"}
})

save_model_config({
    "model_type": "Transfer Learning",
    "num_classes": 9,
    "transfer_config": {
        "base_model": "ResNet50",
        "strategy": "Partial Fine-tuning",
        "unfreeze_layers": 3
    }
})
File Structure: app/state/persistence.py
.streamlit_sessions/
└── {session_id}/
    ├── session.json          # Session metadata
    ├── dataset.json          # Dataset configuration
    ├── models.json           # Model configurations
    ├── training.json         # Training configurations
    └── experiments.json      # Experiment results & history

Best Practices

Experiment Design

  • Start with a baseline model
  • Use stratified splits
  • Set reproducible random seeds
  • Document hyperparameters

Training

  • Monitor validation metrics
  • Use early stopping
  • Save best model checkpoints
  • Track learning curves

Data

  • Balance classes or use weighted loss
  • Apply appropriate augmentation
  • Validate data quality
  • Check for data leakage

Evaluation

  • Use multiple metrics
  • Analyze confusion matrix
  • Check per-class performance
  • Visualize mistakes

References

  • Training engine: app/training/engine.py
  • Dataset pipeline: app/training/dataset.py
  • Model builders: app/models/pytorch/
  • State management: app/state/workflow.py

Build docs developers (and LLMs) love