Multiple GPU training in PyTorch and Gradient Accumulation as an alternative to it
In this article we are going to first see the differences between Data Parallelism (DP) and Distributed Data Parallelism (DDP) algorithms, then we will explain what Gradient Accumulation (GA) is to finally show how DDP and GA are implemented in Pytorch and how they lead to the same result.
Introduction
When training a deep neural network (DNN), one important hyperparameter is the batch size. Normally, the batch size should not be too big because the network would tend to overfit, but also not too small because it will result in slow convergence. When working with images of high resolution or other types of data that occupy a lot of memory, assuming that today most training of big DNN models are done on GPUs, fitting small batch size can be problematic depending on the memory of the available GPU. Because, as we said, small batch sizes result in slow convergence, there are three main methods we can use to increase the effective batch size:
- Using multiple small GPUs running the model in parallel on mini-batches – DP or DDP algorithms
- Using a larger GPU (expensive)
- Accumulate the gradient over multiple steps
Let's now look at 1. and 3. in more details – if you are lucky to have a large GPU that you can fit all the data you need on it, you can read the DDP part and see how it's implemented in PyTorch in the Full Code section skipping the rest.
Say we want an effective batch size of 30 but we can only fit 10 data points (mini-batch size) on each GPU. We have two choices: Data Parallelism or Distributed Data Parallelism:
Data Parallelism (DP)
First, we define the master GPU. Then, we perform the following steps:
- Move 10 data points (mini-batch) and the replica of the model to other 2 GPUs from master GPU
- Do a forward pass on each GPU and pass to master GPU the outputs
- Compute the total loss on Master GPU and then send back the loss to each GPU to compute the gradients for the parameters
- Send the gradients back (these are the average of the gradients for all training examples) to Master GPU, sum them up to get the average gradient for the entire batch of 30
- Update the parameters on the Master GPU and send these updates to the other 2 GPUs for the next iteration
There are some problems & inefficiencies with this process:
- Data are passed from the Master GPU before being split between other GPUs. Also, Master GPU is utilized more than other GPUs as computation of total loss and parameters updates happen on Master GPU
- We need to synchronize the models on other GPUs at each iteration which can slow down the training
Distributed Data Parallel (DDP)
Distributed Data Parallel was introduced to improve on inefficiencies of Data Parallel algorithm. We still have the same settings as before – 30 data points for each batch with 3 GPUs. The differences are the following:
- It does not have the Master GPU
- Because we don't have the Master GPU anymore, we load the data on each GPU in a non-overlapping way in parallel directly from the disk/RAM – DistributedSampler does this job for us. Under the hood it uses the local rank (GPU id) to distribute the data across GPUs – given 30 data points, first GPU will use points [0, 3, 6, … , 27], 2nd GPU [1, 4, 7, .., 28] and 3rd GPU [2, 5, 8, .. , 29]
n_gpu = 3
for i in range(n_gpu):
print(np.arange(30)[i:30:n_gpu])
- Forward pass, loss computation and backward passes are executed on each GPU independently and the gradients are asynchronously reduced calculating the mean and then the update follows across all GPUs
Because of the advantages of DDP over DP, DDP usage is preferred nowadays, thus we will only show the DDP implementation.
Gradient Accumulation
If we have only one GPU but still want to use a larger batch size, an alternative option is to accumulate the gradients for a certain number of steps, effectively accumulating the gradients for certain number of mini-batches increasing the effective batch size. From the above example, we could accumulate the gradients of 10 data points for 3 iterations to achieve the same results as what we described in DDP training with an effective batch size of 30.
DDP process Code
Below I will only go through the differences when implementing DDP compared to 1 GPU code. The full code can be found some sections below. First we initialize the process group that allows different processes to communicate between them. With _int(os.environ["LOCAL_RANK"])_we retrieve the GPU used in a given process.
init_process_group(backend="nccl")
device = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(device)
Then, we need to wrap the model in DistributedDataParallel that enables multi-gpu training.
model = NeuralNetwork(args.data_size)
model = model.to(device)
if args.distributed:
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])
The final part is to define DistributedSampler that I mentioned in DDP section.
sampler = torch.utils.data.DistributedSampler(dataset)
The rest of the training stays the same – I will include the full code at the end of this article.
Gradient Accumulation Code
When backpropagation happens, after we call loss.backward(), gradients are stored in their respective Tensors. The actual update happens when optimizer.step() is called and then the gradients stored in the Tensors are set to zero with _optimizer.zerograd() to run the next iteration of backpropagation and parameters update. Thus, to accumulate the gradient we call loss.backward() for the number of gradient accumulations we need without setting gradients to zero so that they accumulate across multiple iterations, and then we average them to get the average gradient across accumulated gradient iterations (_loss = loss/ACCSTEPS). After that we call optimizer.step() and zero the gradients to start the next accumulation of gradients. In code:
ACC_STEPS = dist.get_world_size() # == number of GPUs
# iterate through the data
for i, (idxs, row) in enumerate(loader):
loss = model(row)
# scale loss according to accumulation steps
loss = loss/ACC_STEPS
loss.backward()
# keep accumualting gradients for ACC_STEPS
if ((i + 1) % ACC_STEPS == 0):
optimizer.step()
optimizer.zero_grad()
Full code
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"
print(os.environ["CUDA_VISIBLE_DEVICES"])
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, Sampler
import argparse
import torch.optim as optim
import numpy as np
import random
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
from torch.distributed import init_process_group
import torch.distributed as dist
class data_set(Dataset):
def __init__(self, df):
self.df = df
def __len__(self):
return len(self.df)
def __getitem__(self, index):
sample = self.df[index]
return index, sample
class NeuralNetwork(nn.Module):
def __init__(self, dsize):
super().__init__()
self.linear = nn.Linear(dsize, 1, bias=False)
self.linear.weight.data[:] = 1.
def forward(self, x):
x = self.linear(x)
loss = x.sum()
return loss
class DummySampler(Sampler):
def __init__(self, data, batch_size, n_gpus=2):
self.num_samples = len(data)
self.b_size = batch_size
self.n_gpus = n_gpus
def __iter__(self):
ids = []
for i in range(0, self.num_samples, self.b_size * self.n_gpus):
ids.append(np.arange(self.num_samples)[i: i + self.b_size*self.n_gpus :self.n_gpus])
ids.append(np.arange(self.num_samples)[i+1: (i+1) + self.b_size*self.n_gpus :self.n_gpus])
return iter(np.concatenate(ids))
def __len__(self):
# print ('tcalling Sampler:__len__')
return self.num_samples
def main(args=None):
d_size = args.data_size
if args.distributed:
init_process_group(backend="nccl")
device = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(device)
else:
device = "cuda:0"
# fix the seed for reproducibility
seed = args.seed
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
cudnn.benchmark = True
# generate data
data = torch.rand(d_size, d_size)
model = NeuralNetwork(args.data_size)
model = model.to(device)
if args.distributed:
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
dataset = data_set(data)
if args.distributed:
sampler = torch.utils.data.DistributedSampler(dataset, shuffle=False)
else:
# we define `DummySampler` for exact reproducibility with `DistributedSampler`
# which splits the data as described in the article.
sampler = DummySampler(dataset, args.batch_size)
loader = DataLoader(
dataset,
batch_size=args.batch_size,
num_workers=0,
pin_memory=True,
sampler=sampler,
shuffle=False,
collate_fn=None,
)
if not args.distributed:
grads = []
# ACC_STEPS same as GPU as we need to divide the loss by this number
# to obtain the same gradient as from multiple GPUs that are
# averaged together
ACC_STEPS = args.acc_steps
optimizer.zero_grad()
for epoch in range(args.epochs):
if args.distributed:
loader.sampler.set_epoch(epoch)
for i, (idxs, row) in enumerate(loader):
if args.distributed:
optimizer.zero_grad()
row = row.to(device, non_blocking=True)
if args.distributed:
rank = dist.get_rank() == 0
else:
rank = True
loss = model(row)
if args.distributed:
# does average gradients automatically thanks to model wrapper into
# `DistributedDataParallel`
loss.backward()
else:
# scale loss according to accumulation steps
loss = loss/ACC_STEPS
loss.backward()
if i == 0 and rank:
print(f"Epoch {epoch} {100 * '='}")
if not args.distributed:
if (i + 1) % ACC_STEPS == 0: # only step when we have done ACC_STEPS
# acumulate grads for entire epoch
optimizer.step()
optimizer.zero_grad()
else:
optimizer.step()
if not args.distributed and args.verbose:
print(100 * "=")
print("Model weights : ", model.linear.weight)
print(100 * "=")
elif args.distributed and args.verbose and rank:
print(100 * "=")
print("Model weights : ", model.module.linear.weight)
print(100 * "=")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--distributed', action='store_true',)
parser.add_argument('--seed', default=0, type=int)
parser.add_argument('--epochs', default=2, type=int)
parser.add_argument('--batch_size', default=4, type=int)
parser.add_argument('--data_size', default=16, type=int)
parser.add_argument('--acc_steps', default=3, type=int)
parser.add_argument('--verbose', action='store_true',)
args = parser.parse_args()
print(args)
main(args)
Now, if we run these two scripts:
- _python3 ddp.py – epochs 2 – batch_size 4 – data_size 8 – verbose – accsteps 2
- _torchrun – standalone – nproc_per_node=2 ddp.py – epochs 2 – distributed – batch_size 4 – datasize 8 – verbose
we will see that we obtain the exact same final model parameters:
# From Gradient Accumulator
Model weights : Parameter containing:
tensor([[0.9472, 0.9440, 0.9527, 0.9687, 0.9570, 0.9343, 0.9411, 0.9186]],
device='cuda:0', requires_grad=True)
# From DDP:
Model weights : Parameter containing:
tensor([[0.9472, 0.9440, 0.9527, 0.9687, 0.9570, 0.9343, 0.9411, 0.9186]],
device='cuda:0', requires_grad=True)
Conclusions
In this article we have briefly introduced and gave an intuition behind DP, DDP algorithms and Gradient Accumulation and have shown how to increase the size effective batch size even without multiple GPUs. One important thing to notice is that even if we obtain the same final results, training with multiple GPUs is much faster than using gradient accumulation, thus if the training speed is important then multiple GPUs is the only way to speed up the training.