Skip to main content

Distributed GPU Training Guide

Distributed training splits the training workload across multiple GPUs or nodes, dramatically reducing training time for large models and datasets.
Azure Machine Learning supports distributed training with PyTorch DDP, DeepSpeed, TensorFlow, and Horovod for both data parallelism and model parallelism.

What is Distributed Training?

In distributed training, you split the workload across multiple mini processors called worker nodes. These nodes work in parallel to speed up model training.

Data Parallelism

Same model on each node, different data subsets

Model Parallelism

Model split across nodes, same data
Use data parallelism for most workloads - it’s easier to implement and sufficient for 90%+ of use cases.

Data Parallelism

The data is divided into partitions equal to the number of available nodes. Each node gets a copy of the model and trains on its data subset. Requirements:
  • Each node must have enough memory for the full model
  • Nodes synchronize gradients at batch completion
  • Best for models that fit in single GPU memory

Model Parallelism

The model is segmented into parts that run concurrently on different nodes with the same data. Use when:
  • Model is too large for single GPU
  • Need to train models >10GB
  • Have very deep networks
Model parallelism is more complex to implement than data parallelism and may not scale as efficiently.

PyTorch Distributed Training

Azure ML supports PyTorch’s native torch.distributed for distributed training.

Process Group Initialization

Create process group for worker communication:
import torch.distributed as dist

# Initialize process group
dist.init_process_group(
    backend='nccl',  # Use NCCL for GPU training
    init_method='env://',  # Use environment variables
)

# Get process information
world_size = dist.get_world_size()  # Total number of processes
rank = dist.get_rank()  # Global rank of current process
local_rank = int(os.environ['LOCAL_RANK'])  # Local rank on node

print(f"Process {rank}/{world_size} on GPU {local_rank}")

Environment Variables

Azure ML automatically sets these variables:
VariableDescription
MASTER_ADDRIP of rank 0 process
MASTER_PORTFree port on rank 0 machine
WORLD_SIZETotal number of processes
RANKGlobal rank (0 to world_size-1)
LOCAL_RANKLocal rank on node
NODE_RANKNode rank for multinode

Distributed Training Script

import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

def main():
    # Initialize distributed training
    dist.init_process_group(backend='nccl', init_method='env://')
    
    # Get rank and local rank
    rank = dist.get_rank()
    local_rank = int(os.environ['LOCAL_RANK'])
    world_size = dist.get_world_size()
    
    # Set device
    torch.cuda.set_device(local_rank)
    device = torch.device(f'cuda:{local_rank}')
    
    # Create model and move to device
    model = YourModel().to(device)
    
    # Wrap model with DDP
    model = DDP(model, device_ids=[local_rank])
    
    # Create distributed sampler
    train_dataset = YourDataset()
    train_sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )
    
    # Create dataloader with sampler
    train_loader = DataLoader(
        train_dataset,
        batch_size=32,
        sampler=train_sampler,
        num_workers=4,
        pin_memory=True
    )
    
    # Training loop
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(10):
        # Set epoch for sampler (important for shuffling)
        train_sampler.set_epoch(epoch)
        
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            # Log only on rank 0
            if rank == 0 and batch_idx % 10 == 0:
                print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")
        
        # Save checkpoint on rank 0 only
        if rank == 0:
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.module.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
            }, f'checkpoint_epoch_{epoch}.pt')
    
    # Cleanup
    dist.destroy_process_group()

if __name__ == '__main__':
    main()

Submit PyTorch Distributed Job

from azure.ai.ml import command
from azure.ai.ml.entities import ResourceConfiguration

# Define distributed job
job = command(
    code="./src",
    command="python train_distributed.py",
    environment="azureml://registries/azureml/environments/pytorch-2.0-cuda11.7/versions/1",
    compute="gpu-cluster",
    instance_count=4,  # 4 nodes
    distribution={
        "type": "pytorch",
        "process_count_per_instance": 2  # 2 GPUs per node = 8 total GPUs
    },
    resources=ResourceConfiguration(
        instance_type="Standard_NC24s_v3"  # 4x V100 GPUs per node
    ),
    display_name="pytorch-distributed-training",
    experiment_name="distributed-training"
)

returned_job = ml_client.jobs.create_or_update(job)
print(f"Job URL: {returned_job.studio_url}")
process_count_per_instance should equal the number of GPUs per node. Azure ML handles setting all environment variables automatically.

DeepSpeed

DeepSpeed enables training massive models with near-linear scalability.

DeepSpeed Configuration

{
  "train_batch_size": 64,
  "gradient_accumulation_steps": 1,
  "optimizer": {
    "type": "AdamW",
    "params": {
      "lr": 0.001,
      "betas": [0.9, 0.999],
      "eps": 1e-8
    }
  },
  "fp16": {
    "enabled": true,
    "loss_scale": 0,
    "initial_scale_power": 16
  },
  "zero_optimization": {
    "stage": 2,
    "offload_optimizer": {
      "device": "cpu",
      "pin_memory": true
    },
    "allgather_partitions": true,
    "allgather_bucket_size": 2e8,
    "reduce_scatter": true,
    "reduce_bucket_size": 2e8,
    "overlap_comm": true,
    "contiguous_gradients": true
  }
}

DeepSpeed Training Script

import deepspeed
import torch
import torch.nn as nn
from transformers import AutoModelForCausalLM

def main():
    # Parse DeepSpeed config
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--local_rank', type=int, default=0)
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()
    
    # Create model
    model = AutoModelForCausalLM.from_pretrained("gpt2-large")
    
    # Initialize DeepSpeed
    model_engine, optimizer, train_loader, _ = deepspeed.initialize(
        args=args,
        model=model,
        model_parameters=model.parameters(),
        training_data=train_dataset
    )
    
    # Training loop
    for epoch in range(args.epochs):
        for step, batch in enumerate(train_loader):
            inputs, labels = batch
            inputs = inputs.to(model_engine.local_rank)
            labels = labels.to(model_engine.local_rank)
            
            outputs = model_engine(inputs, labels=labels)
            loss = outputs.loss
            
            model_engine.backward(loss)
            model_engine.step()
            
            if step % 10 == 0:
                print(f"Epoch {epoch}, Step {step}, Loss: {loss.item():.4f}")
        
        # Save checkpoint
        model_engine.save_checkpoint('./checkpoints', epoch)

if __name__ == '__main__':
    main()

Submit DeepSpeed Job

from azure.ai.ml import command

job = command(
    code="./src",
    command="deepspeed train_deepspeed.py --deepspeed_config ds_config.json",
    environment="azureml://registries/azureml/environments/deepspeed-0.9.5/versions/1",
    compute="gpu-cluster",
    instance_count=4,
    distribution={
        "type": "pytorch",
        "process_count_per_instance": 4
    },
    resources=ResourceConfiguration(
        instance_type="Standard_ND96asr_v4"  # 8x A100 GPUs
    )
)

ml_client.jobs.create_or_update(job)

TensorFlow Distributed Training

Use TensorFlow’s tf.distribute.Strategy for distributed training:
import tensorflow as tf
import os

def main():
    # Create distribution strategy
    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    
    print(f"Number of devices: {strategy.num_replicas_in_sync}")
    
    # Create and compile model within strategy scope
    with strategy.scope():
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax')
        ])
        
        model.compile(
            optimizer='adam',
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
    
    # Load data
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train = x_train.reshape(-1, 784).astype('float32') / 255
    x_test = x_test.reshape(-1, 784).astype('float32') / 255
    
    # Create distributed dataset
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    train_dataset = train_dataset.shuffle(1000).batch(64)
    
    # Train model
    model.fit(
        train_dataset,
        epochs=10,
        steps_per_epoch=100
    )
    
    # Save model (only on chief worker)
    if os.environ.get('TF_CONFIG', None):
        task_type = json.loads(os.environ['TF_CONFIG'])['task']['type']
        if task_type == 'chief' or task_type == 'worker' and task_index == 0:
            model.save('outputs/model')
    else:
        model.save('outputs/model')

if __name__ == '__main__':
    main()

Submit TensorFlow Job

job = command(
    code="./src",
    command="python train_tensorflow.py",
    environment="azureml://registries/azureml/environments/tensorflow-2.13-cuda11/versions/1",
    compute="gpu-cluster",
    instance_count=4,
    distribution={
        "type": "tensorflow",
        "worker_count": 4,
        "parameter_server_count": 1  # For parameter server strategy
    }
)

ml_client.jobs.create_or_update(job)

InfiniBand for High-Performance Training

InfiniBand provides ultra-low latency networking for distributed training.

Supported VM Series

VM SeriesInfiniBandBandwidthGPUs
ND96asr_v4200 Gb/s8x A100
ND96amsr_A100_v4200 Gb/s8x A100 80GB
NDm_A100_v4200 Gb/s8x A100
HBv3200 Gb/s-
HC100 Gb/s-

Enable InfiniBand

from azure.ai.ml.entities import AmlCompute

# Create InfiniBand-enabled cluster
cluster = AmlCompute(
    name="ib-gpu-cluster",
    size="Standard_ND96asr_v4",
    min_instances=0,
    max_instances=4,
    enable_node_public_ip=False,  # Required for IB
    tier="Dedicated"
)

ml_client.compute.begin_create_or_update(cluster)
InfiniBand can reduce communication overhead by 10-30x compared to Ethernet for all-reduce operations.

Best Practices

For GPU training, always use NCCL:
dist.init_process_group(backend='nccl')
NCCL is optimized for GPU communication and supports InfiniBand.
For large models, accumulate gradients over multiple batches:
accumulation_steps = 4

for i, (data, target) in enumerate(train_loader):
    output = model(data)
    loss = criterion(output, target) / accumulation_steps
    loss.backward()
    
    if (i + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()
Use FP16 to reduce memory and increase speed:
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()

for data, target in train_loader:
    with autocast():
        output = model(data)
        loss = criterion(output, target)
    
    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()
    optimizer.zero_grad()
Optimize data loading for distributed training:
train_loader = DataLoader(
    dataset,
    batch_size=32,
    sampler=DistributedSampler(dataset),
    num_workers=4,  # Multiple workers per GPU
    pin_memory=True,  # Faster GPU transfer
    prefetch_factor=2  # Prefetch batches
)

Monitoring Distributed Training

Track distributed training metrics:
import mlflow

if dist.get_rank() == 0:  # Log only from rank 0
    mlflow.log_metric("train_loss", loss.item(), step=global_step)
    mlflow.log_metric("learning_rate", optimizer.param_groups[0]['lr'], step=global_step)
    mlflow.log_metric("gpu_utilization", torch.cuda.utilization(), step=global_step)

Troubleshooting

Increase timeout for large models:
import datetime

dist.init_process_group(
    backend='nccl',
    init_method='env://',
    timeout=datetime.timedelta(minutes=30)
)
Solutions:
  1. Reduce batch size
  2. Enable gradient checkpointing
  3. Use gradient accumulation
  4. Try DeepSpeed ZeRO
Check:
  • Data loading bottlenecks
  • Network bandwidth utilization
  • GPU utilization percentage
  • Use larger batch sizes if possible

Next Steps

Deploy Models

Deploy trained models to production

MLOps

Automate training pipelines

Model Optimization

Optimize models for inference

Azure AI Examples

Complete distributed training examples

Build docs developers (and LLMs) love