back to indexDistributed Training with PyTorch: complete tutorial with cloud infrastructure and code
Chapters
0:0 Introduction
2:43 What is distributed training?
4:44 Data Parallelism vs Model Parallelism
6:25 Gradient accumulation
19:38 Distributed Data Parallel
26:24 Collective Communication Primitives
28:39 Broadcast operator
30:28 Reduce operator
32:39 All-Reduce
33:20 Failover
36:14 Creating the cluster (Paperspace)
49:0 Distributed Training with TorchRun
54:57 LOCAL RANK vs GLOBAL RANK
56:5 Code walkthrough
66:47 No_Sync context
68:48 Computation-Communication overlap
70:50 Bucketing
72:11 Conclusion
00:00:00.000 |
Hello guys, welcome back to my channel. Today we are going to talk about the distributed training 00:00:04.080 |
of a model with PyTorch. What do I mean? Well imagine you have a model that is very big and 00:00:09.120 |
you have a lot of data upon which you want to train this model but it doesn't, it takes a lot 00:00:13.520 |
of time to train it because maybe your GPU is not so powerful or maybe you cannot use a large batch 00:00:19.840 |
size or just you have a lot of data to train this model and you want to train it in a reasonable 00:00:23.840 |
time. One way to do it is to parallelize the training on using multiple GPUs or even multiple 00:00:29.200 |
computers, each one having multiple GPUs or even one GPU. In this video I will show you how this 00:00:35.200 |
distributed training works, how to integrate it in your existing project and we will combine theory 00:00:41.280 |
with practice which means that I will give you all the theory behind the distributed training and 00:00:45.280 |
then also show you how to first of all build the cloud infrastructure to do cloud training of a 00:00:51.920 |
model and also how to integrate the distributed training into an existing model that you have 00:00:56.800 |
already built and it's maybe already training on a single GPU. So let's review the topics of today. 00:01:02.160 |
First of all I will give you an introduction to distributed training and show you the two 00:01:05.760 |
ways in which we can do it. One is data parallelism and one in model parallelism. In this 00:01:10.080 |
video I will explore data parallelism. We will see a little bit of the neural networks theory 00:01:15.520 |
because I want to introduce a concept called gradient accumulation which is very important 00:01:19.760 |
for distributed training and later we will see distributed training as it is built in PyTorch 00:01:25.040 |
so distributed data parallel. We will see how it works and I will also show you what we mean by 00:01:31.200 |
communication primitives because maybe you have heard terms like all reduce, reduce broadcast, 00:01:36.080 |
all gather etc but you maybe don't know what they are or how they work. In this video I will show 00:01:41.040 |
you the algorithms upon which these operations are built and how they work when we talk about 00:01:46.000 |
distributed training. I will also show you how to manage failover of a node so imagine you are 00:01:50.400 |
training on multiple nodes and one of the nodes suddenly dies how we manage this situation. We 00:01:55.840 |
will do a little coding session I will not write code I will just take an existing code and show 00:02:00.000 |
you how to add the distributed training to an existing model and I will be using a model that 00:02:05.200 |
I built in a previous video so it's the video about the how to code a transformer model from 00:02:10.240 |
scratch so if you have already watched my previous video on how to do it it's wonderful if you didn't 00:02:14.960 |
it's not a problem because anyway the knowledge I will give you today applies to any model that 00:02:20.000 |
you have built not only the one wrote by me and I will also show you the underlying mechanism in 00:02:26.800 |
which PyTorch does distributed training so I will introduce you the computation communication 00:02:32.320 |
overlap and the bucketing which are optimizations made in the PyTorch implementation of the 00:02:37.920 |
distributed training and so we will explore all these topics. Let's start our journey so let's 00:02:43.840 |
start by introducing what is distributed training. Imagine you want to train a language model on a 00:02:49.280 |
very big data set for example the entire content of wikipedia now the data set is quite big because 00:02:54.640 |
it's made up of millions of articles and each article has maybe thousands of tokens to train 00:03:00.560 |
this model on a single gpu may be possible but it poses some challenges first of all the model may 00:03:06.240 |
not fit on a single gpu this happens when the model has many parameters this happens for example 00:03:11.600 |
also with the latest llama they have billions of parameters and with so many parameters the ram of 00:03:18.160 |
the gpu may not be enough or you are forced to use a small batch size because a bigger batch size 00:03:24.640 |
leads to the out of error out of memory error on CUDA or the model may take years to train because 00:03:31.360 |
the data set is very big if any of the above points applies to you then you need to scale up 00:03:36.400 |
your training setup and scaling can be done vertically or horizontally let's compare the two 00:03:41.200 |
options. Vertical scaling means that you have a single machine for example a single gpu which has 00:03:47.520 |
some ram and the gpu has some memory for example four gigabyte vertical scaling means that you 00:03:52.880 |
just take the you just buy a bigger computer and or a bigger gpu so you take the existing machine 00:03:59.920 |
and you upgrade the hardware and this requires no code change because the code that was running on 00:04:05.680 |
the small machine can also run on the big machine you just maybe need to adjust a little bit of the 00:04:09.440 |
parameters maybe you can increase the batch size now with horizontal scaling we have we go from a 00:04:15.680 |
single machine to multiple machines each one interconnected with each other that are communicating 00:04:20.800 |
for this parallelization training and each machine may have one or multiple gpus and this requires 00:04:28.720 |
code change but this code is mean this code change is minimal thanks to pytorch and its 00:04:33.120 |
implementation of the distributed data parallel and in this video we will explore horizontal 00:04:38.800 |
scaling because vertical scaling basically we don't need to do anything the code there is no 00:04:43.040 |
code change so there are two ways of distributing training of a model one is called the data 00:04:50.080 |
parallelism and one is called the model parallelism so if the model can fit within a single gpu then 00:04:55.520 |
we can distribute the training on multiple servers each one having one or more gpus with each gpus 00:05:02.400 |
processing a subset of the entire data in parallel and synchronizing the gradients during back 00:05:08.160 |
propagation this technique is known as data parallelism so we have one model that can fit 00:05:14.400 |
within a single gpu and we have a lot of data what we do is basically we split this data into subsets 00:05:20.480 |
and we train it on multiple computers in parallel such that each gpu is training on a subset of this 00:05:27.520 |
entire data set if the model however cannot fit within a single gpu then we need to break the 00:05:34.400 |
model into smaller pieces into smaller layers and each gpu process a part of the forward and 00:05:40.640 |
the backward step during gradient descent this option is known as model parallelism so imagine 00:05:46.160 |
this model initial model doesn't fit on a single gpu so we can break it into layers and each 00:05:51.600 |
computer is managing a layer not the entire model and of course we can also combine data parallelism 00:05:59.040 |
with model parallelism to create a hybrid option in this video we will focus on data parallelism 00:06:05.200 |
so we pretend that we have a model that is complex but it can fit on a single gpu but we have a lot 00:06:11.520 |
of training data and it's taking really forever to train it on a single computer so we want to 00:06:16.320 |
distribute this training on multiple computers or multiple gpus such that each gpu will train 00:06:22.480 |
on a subset of the data okay we need to review a little bit of the neural networks because i need 00:06:28.960 |
to introduce a concept called gradient accumulation okay imagine you want to train a neural network 00:06:35.440 |
to predict the price of a house given two variables the number of bedroom in the house and 00:06:41.200 |
we will call this variable x1 and the number of bathrooms in the house and we will call this 00:06:45.920 |
variable x2 we think because of some intuition that the relationship between the input and the 00:06:52.880 |
output variable is linear so we can write that the predicted price is equal to x1 multiplied by the 00:06:59.280 |
first weight plus x2 multiplied by the second weight plus a bias term our goal is to use 00:07:06.080 |
stochastic gradient descent to find the values of the parameters w1 and w2 and bias such that 00:07:12.880 |
the mean squared error loss between the actual house price and the predicted house price is 00:07:18.240 |
minimized in other words we want to find w1 w2 and b such that we minimize this quantity here 00:07:25.600 |
how do we do a training loop with pytorch without gradient accumulation so first of all we run the 00:07:34.720 |
training for a few epochs we take our training data which is made up of x1 x2 and the target 00:07:41.120 |
price we calculate the output of the model which is basically y pred is equal to x1 multiplied by 00:07:47.360 |
w1 plus x2 multiplied by w2 plus the bias then we calculate the loss we do backward on the loss 00:07:55.120 |
this will calculate the gradient of the loss function with respect to each parameter and then 00:08:00.000 |
we update the parameters using the gradient we have calculated i am not using the optimizer i'm 00:08:06.320 |
just writing the the update rule by hand so using a learning rate i don't use the momentum just for 00:08:12.880 |
simplicity and this is actually how the training loop works right so this is this part here is 00:08:18.560 |
equivalent to calling optimizer.snap and this part here is equivalent to calling optimizer.zero 00:08:23.920 |
let's see it graphically what happens when we do a training loop like this so imagine we have our 00:08:30.160 |
expression which represents the model which could in this case it's a very simple expression but 00:08:34.560 |
imagine it's a very complex model okay what pytorch will do it will create a computation 00:08:40.320 |
graph so it will take our input it will multiply it by the w parameters each input with its own 00:08:48.080 |
weight then it will combine the sum of the two plus the bias term this will produce an output 00:08:54.560 |
then we have a target the target is compared with the output to produce a loss and then we 00:09:01.680 |
run back propagation to calculate the gradient of the loss function with respect to the target so 00:09:07.360 |
let's visualize the training process one item at a time using our computation graph 00:09:11.920 |
imagine we are training on the input x1 is equal to 6 x2 is equal to 2 and the target is 15 the 00:09:20.800 |
first thing our model will do is will start from the two input nodes so x1 and x2 it will multiply 00:09:26.400 |
by the w weights and we also think that the w weights have been initialized with the following 00:09:32.560 |
value so the value of this weight is 0.773 this weight is 0.321 and this weight is 0.067 00:09:40.240 |
these values could be randomly generated as we usually do actually pytorch will produce 00:09:48.240 |
the output of this node which is multiplying the x1 value with w1 then it will combine the two 00:09:55.200 |
producing the output of the model it will compare it with the target to produce a loss which is a 00:10:00.560 |
number but it's also a function so we can calculate the gradient so now usually we call a loss dot 00:10:06.880 |
backward to calculate the gradient of the loss function with respect to each parameter so w1 w2 00:10:12.800 |
and b pytorch will also compute the intermediate nodes but i will not show them here for simplicity 00:10:18.640 |
so we run loss dot backward what will loss dot backward do it will calculate the gradient 00:10:24.960 |
of the loss function with respect to each weight how does it do it well the first thing we do it 00:10:31.600 |
will do it will calculate the gradient of the loss function with respect to the output which can be 00:10:36.400 |
done like this then it will calculate the gradient of the output with respect to this node i here i 00:10:44.880 |
am showing only the computation of the gradient of the loss function with respect to the w1 00:10:51.280 |
node you can do the other ones for exercise for example so next it will compute the gradient 00:10:58.320 |
of the loss function with respect to the z1 node but in order to do it because of the chain rule 00:11:03.760 |
we need the gradient of the output with respect to the z1 node and then it will compute the output 00:11:10.320 |
the gradient of the z1 node with respect to w1 and this allow us to calculate the gradient of 00:11:16.160 |
the loss function with respect to w1 these are all the calculations that you need to do in order to 00:11:21.200 |
gather the gradient of this node here and as you can see we also need to compute the gradient of 00:11:26.560 |
the intermediate node but we are only interested in the in order to arrive to the gradient of the 00:11:31.840 |
parameters of the loss function with respect to the parameters and these are the values for 00:11:36.960 |
the gradient that i that we have calculated for each parameter the next thing we do in during 00:11:43.680 |
training is we run optimizer dot step this will update the value of each parameter using the 00:11:50.240 |
gradient that we have calculated and how do we do it we say that the new value of the parameter is 00:11:54.960 |
equal to the old value of the parameter minus learning rate multiplied by the gradient why 00:12:02.240 |
minus because the gradient indicates the direction in which the function grows the most but we want 00:12:07.600 |
to move against this direction because we want to minimize the the loss so we put a minus sign here 00:12:14.560 |
and this is how the value will be updated using the gradient that we have calculated in the 00:12:20.000 |
previous step which was the loss dot backward function the next thing we do is we run optimizer 00:12:27.360 |
dot zero so this will zero out all the gradients of each parameter and also the intermediate nodes 00:12:33.600 |
and then we do the next iteration on the next item so imagine we are training one item at a 00:12:40.160 |
time so batch size is one the next item may have x1 is equal to 5 x2 is equal to 2 and the target 00:12:46.880 |
may be 12 this will produce this loss and this output the next thing we do is we run loss dot 00:12:53.440 |
backward loss dot backward we calculate the gradient of the loss function with respect to 00:12:58.640 |
each weight and you can see the gradient here the next thing we do is we run optimizer dot step which 00:13:04.960 |
will update the value of the parameter using the gradient that we calculated in the previous step 00:13:09.520 |
and the learning rate and finally we run optimizer dot zero and this will reset all the gradients 00:13:15.360 |
for all the weights we may want to visualize this on this process on the loss function so let's see 00:13:22.080 |
what happens to the loss function while we are doing this process we started some with some 00:13:27.440 |
initial weights so remember we started with some randomly initialized weights values then we ran 00:13:33.280 |
forward step on the first data item this calculated an output and then we run loss dot backward this 00:13:39.920 |
resulted in a gradient being calculated for the loss function with respect to each weight 00:13:44.480 |
this indicates a direction in which we should move our weights in order to minimize the loss 00:13:50.720 |
because we move against the direction of the gradient so the arrow is pointing already in 00:13:55.200 |
the negative direction of the gradient the next thing we do is we do optimizer dot step optimizer 00:14:01.440 |
dot step will take our initial weights and will change the weights value according to the 00:14:07.680 |
negative direction of the gradient we then run optimizer dot zero this just resets the value of 00:14:14.080 |
the gradient and then we run forward on the next data item this will then we run loss dot backward 00:14:20.480 |
and this will result in a calculation of a gradient which indicates a direction in which 00:14:24.320 |
we need to move our weights in order to minimize the loss function and then we actually modify our 00:14:30.240 |
weights using optimizer dot step so the actual update of the parameters values happens when we 00:14:36.800 |
call optimizer dot step and finally we run optimizer dot zero and this will reset the 00:14:43.600 |
gradient to zero and this is how gradient descent works without accumulation and without without 00:14:51.600 |
gradient accumulation at every step so every data item because here we pretend we have a batch size 00:14:56.240 |
equal to one we update the parameters of the model at every step and however with gradient 00:15:02.720 |
accumulation we don't do at every step let's see how it works so the the initial part of the 00:15:08.480 |
training loop with gradient accumulation is the same so we have run for a few epochs we extract 00:15:12.880 |
the data from our training data so x1 x2 and target we calculated the loss just like before 00:15:18.720 |
we run loss dot backward just like before the difference is here that we don't update the value 00:15:25.200 |
of our parameter at every item we train our model upon or every batch we train our model upon but we 00:15:31.840 |
do it at every few items or every few batches in this case i do it in this training loop we do it 00:15:39.840 |
every two items because we are extracting one item at a time from our training data and so we update 00:15:48.880 |
the the parameters using the gradient every two items in this case and what happens is that when 00:15:56.960 |
the first item for example we will arrive to loss dot backward we will not run this code here we 00:16:02.000 |
will restart the loop at the next item we will calculate the output we run again loss dot 00:16:07.280 |
backward and this loss will produce this loss dot backward will create a gradient but this gradient 00:16:12.960 |
will not replace the previous the gradient that we calculated in the previous step but it will be 00:16:18.240 |
accumulated it will summed up to the previous gradient so let's visualize this process step 00:16:23.280 |
by step imagine we have our first item and this is x1 is equal to 6 x2 is equal to 2 and the target 00:16:30.560 |
is 15 we run the forward loop using this item this will result in an output being calculated 00:16:37.360 |
and using the target we can calculate a loss we then run the loss dot backward this will result 00:16:43.200 |
in a gradient being calculated for this item then we don't update the parameters we do forward step 00:16:50.720 |
on the next item so note that during this forward step the gradient is not zero because we didn't 00:16:56.480 |
zero it out in the previous step because in the previous step we didn't run optimizer dot step 00:17:01.280 |
or optimizer dot zero so the gradient is still there from the previous item and okay the second 00:17:08.560 |
item which is x1 is equal to 5 x2 is equal to 2 and the target is 12 will result in a loss being 00:17:13.440 |
calculated and an output we then run loss dot backward and the pytorch will not replace this 00:17:20.320 |
new gradient with the previous one we'll sum it up it will be accumulated that's why we call it 00:17:25.760 |
gradient accumulation so the new gradient is accumulated with the old one now that we have 00:17:31.360 |
reached the batch size we can now finally call the optimizer dot step method this will result 00:17:37.920 |
in the values of the parameter being updated using the accumulative gradient and then we 00:17:45.760 |
run optimizer dot zero which will reset the gradient to zero and then we can proceed with 00:17:50.640 |
another loop of two items etc let's visualize what happens to our loss function so to our 00:17:58.000 |
parameters and the loss function when we do gradient descent with gradient accumulation 00:18:02.640 |
so our initial weights are here because they are randomly initialized we run the forward loop on 00:18:09.840 |
the first item this will result in an output being calculated by the model then we run loss 00:18:17.520 |
loss dot backward this will result in the gradient being calculated with respect to the first data 00:18:23.200 |
item and this gradient will indicate a direction then we run again forward on the second data item 00:18:30.480 |
and then we run loss dot backward in the second data item this also will result in a gradient 00:18:35.520 |
being calculated but these two gradients will then be summed up by pytorch and this will result in a 00:18:41.760 |
resulting vector that indicates a direction and then we run the optimizer so we do optimizer dot 00:18:49.040 |
step which will update the values of our weights according to the direction indicated by the sum 00:18:55.440 |
of the two gradients of the two previous data items and then we run optimizer dot zero this 00:19:01.040 |
will result the gradients to zero for all the nodes and so with gradient accumulation we update 00:19:08.080 |
the parameters of the model only after we accumulated the gradient of a batch and we 00:19:13.360 |
can decide how much we want this batch to be so gradient accumulation is used not only distributed 00:19:19.680 |
training it's also used for example when you want to accumulate the gradient without increasing the 00:19:24.160 |
batch size because maybe a bigger batch size doesn't fit in your gpu so you can accumulate 00:19:29.120 |
the gradient for more than one item and then move the parameters because this will result in a more 00:19:34.240 |
smooth training the loss will oscillate less for example and now that we have seen how gradient 00:19:41.040 |
accumulation works let's see distributed data parallel training in detail so how does it work 00:19:46.240 |
what we mean by communication primitives and how do we manage the failover okay distributed data 00:19:53.040 |
parallel training so imagine you have a training script that is running on a single computer but 00:19:58.720 |
it's very slow because the data set is very big and you can't use a bigger batch size because it 00:20:03.680 |
will result in an out of memory error on CUDA this distributed data parallel is the solution 00:20:09.360 |
in this case it works in the following three scenarios so imagine you have a model that can 00:20:15.680 |
fit on a single gpu and you have a training loop that works but it's very slow so the first thing 00:20:22.880 |
you can do is you want to train the model on multiple servers each one having a single gpu 00:20:28.480 |
and this can be done with distributed data parallel the second setup that you can use is 00:20:35.280 |
you want to just increase the increase the number of gpus on your existing computer and this one 00:20:39.840 |
also can be managed by distributed data parallel training the final setup that you can that you 00:20:45.280 |
may want is you want multiple computers and each one having more than one gpu in this video i will 00:20:51.760 |
show you how to implement this setup here because the other two setup are just a particular case of 00:20:58.240 |
this one so this is the most generic setup we can have and i will show you how to create the 00:21:05.360 |
cloud infrastructure for this setup and also how to run the to convert an existing code into a 00:21:11.120 |
distributed training code and also run it on the cluster that we will create so 00:21:17.040 |
from now on i will use the term node and gpu interchangeably if a cluster is made up of two 00:21:24.800 |
computers and each computer has two gpus then we in total we will have four nodes or four gpus 00:21:30.880 |
distributed data parallel works in the following way so at the beginning of the training the 00:21:36.480 |
model's weights are initialized on one node so on one gpu and then send to all the other nodes 00:21:42.800 |
using an operation called broadcast each node then trains the model the same model because 00:21:49.920 |
it started from the same initial weights on a subset of the data so maybe this one is maybe 00:21:56.320 |
one gpu is training it on the batch number one the second one is on the batch number two the third one 00:22:02.480 |
the batch number three etc etc such that there is no overlap between this data and every few batches 00:22:09.200 |
that these nodes get trained this model the gradients of each node are accumulated so summed 00:22:15.600 |
up into one node and then sent the sum is sent back to all the other nodes this operation is 00:22:21.440 |
known as all reduce then each node updates the parameters of its local model using the gradient 00:22:29.520 |
received by the previous step so this the gradient receives are the previous step is the sum of the 00:22:34.400 |
gradients of all nodes using also its own optimizer so doing optimizer dot step and then we start 00:22:41.600 |
again from the step number two so we train it for a few batches we have some local gradient we send 00:22:47.280 |
our local gradient to a central node which will sum it up sum it up and then send it back to all 00:22:52.320 |
the nodes then all the nodes will update their parameters using this accumulated gradient and 00:22:58.480 |
and then we continue again let's see all these steps one by one in detail so 00:23:07.200 |
step number one our model's weight are initialized on one gpu so imagine we have a setup with four 00:23:14.480 |
gpus we have one gpu that will initialize the weight and send it to all the others so now the 00:23:20.400 |
value of the parameters this is the value of the parameter and this is the gradient so the value 00:23:25.760 |
of the parameter imagine is 0.1 we only have one parameter so for simplicity the initial weights 00:23:31.680 |
are sent to all the other nodes so now all the nodes have the same weights so all the nodes now 00:23:37.520 |
have the same parameters the all the same value the next thing we do is each node runs a forward 00:23:46.000 |
and backward step on one or more batch of data this will result in a local gradient being calculated 00:23:52.720 |
because as we saw before when we run loss dot backward we we have a gradient being calculated 00:23:59.520 |
of the loss function with respect to each parameter this local gradient of course may 00:24:05.120 |
be different for each node because each one is training on a different subset of the data 00:24:09.600 |
and this local gradient may also be the accumulation of one or more batch so it's 00:24:14.720 |
imagine that this gpu is training on each gpu is training on three batch so we can accumulate 00:24:21.600 |
the gradient for three batch and of course because we are training on a different subset of the data 00:24:26.320 |
this cumulative gradient but still local is different for each node the next thing we do 00:24:32.640 |
is we send all this gradient to every node sends its gradient to one single node and this node will 00:24:39.680 |
calculate the sum of all the gradient it receives so imagine all the nodes decide to send their own 00:24:46.160 |
gradient to the first gpu and the sum is calculated on the first gpu then this operation is known as 00:24:53.600 |
reduce and later we will see how it works then the gpu that calculated the sum of all the gradients 00:25:01.600 |
will send it back to all the other nodes in a in an operation that is known as broadcast 00:25:06.400 |
and the sequence of reduce and broadcast because here i show it as a separate operation usually is 00:25:12.560 |
implemented as a single operation known as all reduce and later we will see in detail how it 00:25:17.840 |
works but this is the key idea so we have the models weights that are initialized on a model 00:25:24.320 |
they are sent back to all the other models so now all the other gpus they they have the same 00:25:29.600 |
weights initial weights they train it on a different subset of the data so each one has 00:25:34.240 |
a local gradient they send each one its own local gradient to one node which will calculate the sum 00:25:39.680 |
of all these gradients and send it back to all the other nodes so now they have all the same sum 00:25:44.880 |
then they run optimization steps to update the weights using the same sum so they will produce 00:25:50.560 |
the resulting weights of the models will be the same for all of them as you can see here so now 00:25:56.960 |
each node will update the value of their own parameters using the same sum of the gradient 00:26:03.520 |
you can see here so it's 0.3 they use 0.3 to update the value of their local parameters and 00:26:10.000 |
they will end up with the same weights so each node updates the parameter of its local model 00:26:16.480 |
using the gradient received after the updates the gradients are reset to 0 and then we can 00:26:21.920 |
start with another loop now let's talk about collective communication primitives so the 00:26:27.680 |
operation that we talk i showed before in distributed computing environments a node may 00:26:33.440 |
need to communicate with other nodes if the communication pattern is similar to a client 00:26:38.000 |
and the server then we talk about point-to-point communication because we have one client connects 00:26:43.760 |
to one server in a request response chains of events however there are cases in which one node 00:26:50.080 |
needs to communicate to multiple receivers at once for example in the broadcast scenario right 00:26:55.040 |
we have one node that needs to send its weights to all the other nodes this is the typical case 00:27:01.280 |
of data parallel training in deep learning so one node needs to send its initial weights to all the 00:27:06.640 |
other nodes moreover all the other nodes need to send their gradient to one single node to receive 00:27:12.640 |
the cumulative gradient back so collective communication allows to model the communication 00:27:18.480 |
pattern between group of nodes let's visualize the difference between these two modes of communication 00:27:24.320 |
imagine you need to send a file to seven friends with point-to-point communication you would send 00:27:30.800 |
the file iteratively to each of your friend one by one suppose the internet speed that you have 00:27:36.480 |
is one megabyte per second and the file is five megabyte in size what you do is first you send 00:27:43.040 |
the imagine you are here so you have the file and you send it to your first friend and it will take 00:27:48.240 |
five seconds because you are sending five megabyte with the speed of one megabyte per second then you 00:27:53.840 |
send it to the second friend the third the fourth the fifth the sixth and the seventh in total the 00:27:58.960 |
time to send the file to seven friends will be 35 seconds of course you may say okay but why 00:28:05.440 |
not send the file simultaneously to all the seven friends at the same time of course you can do it 00:28:11.760 |
but the problem is your internet speed is still one megabyte per second and the internet speed 00:28:18.400 |
of sending the file simultaneously to seven friends will be split among these seven friends 00:28:22.960 |
so each friend will be receiving the file at a speed of 143 kilobyte per second more or less 00:28:29.840 |
so the total time is still 35 seconds to distribute your file to your seven friends 00:28:35.120 |
let's see if there is a better way so with collective communication we introduce the 00:28:42.160 |
first operator is known as broadcast so the operation of sending a data to all the other 00:28:46.880 |
known although all the other nodes is known as the broadcast operator collective communication 00:28:52.000 |
libraries like nccl which is pronounced nickel which is a library from nvidia assigns a unique 00:29:00.000 |
id to each node and this node this unique id is known as rank suppose you want to send five 00:29:06.720 |
megabyte with an internet speed of one megabyte per second and let's see how the collective 00:29:11.280 |
communication would work in the case of this broadcast operation the first thing you do is 00:29:16.480 |
you are here so you have the file you send it not to this friend here but to the rank number four 00:29:22.080 |
at the next step we realize that the rank 0 and the rank 4 have the file so they can send 00:29:28.080 |
simultaneously to another friend so rank 0 now will send it to rank 2 and rank 4 will send it 00:29:33.680 |
to rank 6 but now we realize that rank 0 rank 2 rank 4 and rank 6 all have the file so they can 00:29:40.000 |
simultaneously send to it another friend so in total it will take 15 seconds to distribute your 00:29:46.560 |
initial file to all the seven friends compared to the 35 second with the point-to-point communication 00:29:52.560 |
this approach is known as divide and conquer with collective communication we exploit the 00:29:58.960 |
interconnectivity between nodes to avoid idle times and reduce the total communication time 00:30:05.440 |
this is how also your gpu sends the initial weight to all the other nodes it's not one by one because 00:30:11.280 |
that would be too slow not it's not even in parallel because anyway the connection speed 00:30:16.000 |
is always the same it would be split among all the receivers what we do is we do this divide and 00:30:20.720 |
conquer approach so our initial weights are sent using this broadcast operation in this manner here 00:30:26.160 |
which is much faster let's see the reduce operation what is the reduce operation reduce operation 00:30:32.240 |
means that we apply it during the training when we want to calculate the sum of the gradient of 00:30:38.320 |
all the nodes so each node has a local gradient which was calculated using a subset of the data 00:30:43.600 |
but they are all different from each other and what we want is to we want to calculate the sum 00:30:49.360 |
of the all these gradients into one single node let's see how it works so the broadcast operator 00:30:54.720 |
is used to send the initial weights to all the other nodes when we start the training loop 00:31:00.560 |
at every few batches of data being processed by each node the gradients of all the node needs to 00:31:05.760 |
be sent to one node and accumulated so summed up this operation is known as reduce let's see how it 00:31:11.280 |
works so imagine initially each node has its own gradient because they are training on a subset of 00:31:17.520 |
the data and these gradients is all different from each other what we can do is each node will send 00:31:24.240 |
the gradient to his adjacent node so and the adjacent node will sum it up with the his own 00:31:30.640 |
gradient so the node 7 for example will send his gradient to nodes number 6 and the node 00:31:36.160 |
number 6 the receiver node will be responsible for calculating the sum and the same with the 00:31:41.440 |
rank number 5 and 4 and 3 and 2 and 1 and 0 the next step we have the sum here at rank number 4 00:31:51.920 |
rank number 2 and rank number 0 what we do is we send the rank the sum here from rank number 6 to 00:32:00.000 |
rank number 4 which will calculate the sum of the sum and then also from rank number 2 to rank number 00:32:06.800 |
0 the rank number 0 will calculate the sum of the sum and then we do a final step in which we send 00:32:12.800 |
the sum that was present at rank number 4 to the rank number 0 and this sum here is the sum of all 00:32:19.280 |
the gradients of all nodes and in total it took us only three steps to do it so with only three 00:32:26.160 |
steps we accumulated the gradient of all nodes into one single node and it can be proven that 00:32:31.920 |
the communication time is logarithmic with respect to the number of nodes and this is very typical of 00:32:37.840 |
all the divide and conquer approaches the all reduce operation so what we saw before is that 00:32:45.040 |
we are first broadcasting our data so our initial weights then we are reducing the local gradients 00:32:52.160 |
and then we need to send it back the sequence of reduce and broadcast is known as all reduce and 00:32:58.480 |
is usually implemented as a single operation which is faster than doing the sequence of the two 00:33:03.760 |
operations i will not show the algorithm behind all reduce but you can think of it as logically 00:33:09.840 |
as a sequence of reduce and broadcast operation but remember that in practice it's implemented 00:33:15.840 |
as a single operation and it's much faster than doing the two operations alone now imagine you 00:33:23.200 |
are training your model on multiple gpus and one node crashes so imagine you are training in a 00:33:29.120 |
distributed scenario like the one shown below and suddenly one node is crashed and in this case two 00:33:35.520 |
gpus out of four become unavailable how should the system react well one way would be to restart the 00:33:43.040 |
entire cluster and that's easy however by restarting the cluster the training would restart also from 00:33:48.720 |
zero because as you remember we start from initial weights that are randomly chosen by one node which 00:33:53.440 |
are then sent to all the other nodes but this would mean that we would lose all the parameters 00:33:58.800 |
and computation that we have done so far when this node crashed a better approach is to use 00:34:04.320 |
checkpointing so checkpointing means that we save the weights of a model on a shared disk every few 00:34:11.040 |
iteration for example every epoch and then we resume the training from the last checkpoint 00:34:16.080 |
in case there is a crash so remember the step in which we initialize the weights of the model in 00:34:23.680 |
one node and then send it to all the others well instead of just initializing it randomly we just 00:34:28.480 |
can initialize the weights of the model using the latest checkpoint available so that the training 00:34:33.360 |
can continue from there we need a shared storage for saving the checkpoints because it's pytorch 00:34:43.760 |
that will decide which node will initialize the weights and so every node should have access to 00:34:50.080 |
this shared storage plus it is good rule in distributed system to not have one node that 00:34:54.800 |
is more important than the others because any node can fail at any time so we should not make 00:35:00.560 |
any assumption on which is the node that will initialize the weights its pytorch will choose it 00:35:05.120 |
and actually usually pytorch chooses the rank number zero but we should not make assumption 00:35:11.520 |
on which node will be the rank number zero so all nodes should be treated equally so they should run 00:35:17.120 |
the same code and they should have access to the same shared storage and who however when we have 00:35:24.000 |
a shared storage who should be responsible for saving the checkpoint because if we make a code 00:35:28.160 |
in which everyone is writing the checkpoint they may end up overwriting each other so what we do 00:35:34.160 |
is because pytorch will give us will tell us which what is the rank of the current node we will write 00:35:40.640 |
the code in such a way that we check the rank of the current node and if it's the rank number zero 00:35:44.640 |
then we save the checkpoint if it's a rank number one two or three we don't do anything so it means 00:35:50.080 |
that only one rank will be responsible for saving the checkpoint but later when we restart the 00:35:55.600 |
training we don't make any assumption on who will become the rank number zero and as the pytorch 00:36:01.600 |
documentation says it says that the the rank is not stable means that if you restart the training 00:36:09.760 |
it the rank number zero may be assigned to another node okay now that we have seen how 00:36:16.960 |
distributed training works at the theoretical level so we accumulate the gradient that is 00:36:23.440 |
calculated locally by multiple nodes and then send it to one single node which then calculates 00:36:28.720 |
the sum and send it back to all the others which then update the parameter using this sum of the 00:36:34.320 |
gradients it's time to actually look at the practical training so we will first build the 00:36:41.040 |
infrastructure that is needed to run our training and then i will also show you what are the code 00:36:48.160 |
changes that we need to make to our existing training loop to make it distributed and run it 00:36:53.120 |
on the infrastructure that we are going to create i will use paper space as a cloud service mostly 00:36:59.840 |
because it's easy to use and doesn't require much setup time even for beginners i chose it over aws 00:37:06.240 |
because aws has a lot of other setup that you need to do in order to even do simple operations 00:37:11.840 |
and it's easy to get lost so i'm using paper space mostly for this reason so that anyone 00:37:16.480 |
without with any kind of level of knowledge can do the same can follow the tutorial very easily 00:37:22.400 |
so let's get started okay the first thing we need to do is to go to my repository called 00:37:29.600 |
pytorch transformer distributed in which you will find the code of the distributed model that we 00:37:35.040 |
will be running on this cluster and also the instruction on how to create this cluster on 00:37:39.840 |
paper space so i have already accessed my account on paper space the first thing you will want to 00:37:45.600 |
do is to create your ssh key just like you do on github so that you can use this your public ssh 00:37:52.320 |
key to you need to associate your public ssh key to your account here so that you can connect to 00:37:59.360 |
the machines that are created on paper space okay the first thing we need to do is to create a 00:38:05.520 |
private network on which we will connect all these two machines so we will create two machines that 00:38:10.000 |
are connected in this private network and also a shared disk that can be accessed by both machines 00:38:15.840 |
we will use a machine that has two gpus so in total we will have four gpus on which we will 00:38:20.560 |
run the training so let's create our private network networks here and we create a new network 00:38:28.160 |
remember to choose the same region for your computers and the cluster and we will call it 00:38:37.760 |
okay now we have our subnet you can see here the next step to do is to create two nodes 00:38:47.440 |
of type of we can choose any node actually i i choose this one because i wanted to test two 00:38:52.800 |
machines each one having two gpus and we use ml in a box as the operating system so as the image 00:38:58.400 |
of these machines so we create a new machine ml in a box the machine we will do is multi gpu p4000 00:39:06.800 |
multiplied by two this one the region is the same as the network so new york 2 the disk size 50 gb 00:39:13.600 |
is enough in total i think last time i created this cluster and i ran it for many epochs i spent 00:39:20.400 |
five dollars so i think you should not spend more than five dollars for running your own 00:39:25.040 |
distributed training it should not be more expensive than five dollars the first machine 00:39:31.440 |
we will call it cuda zero the network we need to select the network that we have created before 00:39:37.680 |
and we choose public ip dynamic because otherwise we cannot connect to the machine if without a 00:39:42.080 |
public ip then we never create a snapshot because we don't care about backup for now 00:39:47.120 |
and the price of running each machine should be around one dollar you can see here 00:39:54.000 |
and then we create also the second one so ml in a box the machine is this one 00:40:05.280 |
new york disk size this one we will call it cuda one 00:40:10.720 |
distributed training dynamic we don't save and create machine 00:40:18.880 |
last but not least we need to create a network drive of 250 gigabytes we can create this is the 00:40:25.840 |
smallest one they have available so that's why i chose 250 gigabytes we will call it model training 00:40:32.720 |
250 new york and distributed train this must belong to the same network as the two machines 00:40:42.240 |
okay so they are provisioning and starting the two machines now we will configure the machines 00:40:51.760 |
so we need to install some packages first of all we need to install ifconfig because we need to 00:40:56.080 |
have the ip address and while installing ifconfig i ran into a error with a seahorse and i show also 00:41:03.120 |
how to solve this error we will then mount the network drive and then we will clone this 00:41:08.160 |
repository install all the packages that we need for running this code and then we initial i also 00:41:15.520 |
recommend using weights and biases for uh keeping tracking keeping track of the loss etc of all the 00:41:21.680 |
metrics that you need during the training of this model so i recommend you register on weights and 00:41:27.840 |
biases and then you install it and use it also for this for this code for running this code 00:41:34.320 |
because it will make you make it easy for you to visualize the training 00:41:37.360 |
metrics okay the cuda zero is ready we can see some information here 00:41:47.760 |
and connect this will give you us the ip address 00:41:58.480 |
wonderful so now we are inside the machine the first thing we do is we update all the packages 00:42:12.320 |
okay we do also install the net tools but i remember it will run into an into an error 00:42:19.920 |
but looks like this time it doesn't which is good let's try ifconfig wonderful so now we have 00:42:28.320 |
the ip address of the first machine which is this one this is the private address that belongs to 00:42:32.800 |
your subnet that you created here so the one you created here 810 okay now we can do the we we need 00:42:45.440 |
to keep track of this ip address because we later we need to modify the host and this is actually 00:42:49.920 |
needed because i ran into an error with the pytorch when running distributed training because 00:42:54.720 |
it could not um it could not reach the other node so i had to modify the host by mapping the host 00:43:00.640 |
name of the other nodes into its ip uh okay i let's mount the network drive following just 00:43:08.800 |
the instructions i have written so we install this package let me call this one cuda zero 00:43:26.960 |
then we created the directory in which we want to mount this network drive 00:43:31.280 |
and then we have to run this command here you can see it here but we need to replace the ip address 00:43:39.680 |
and the username and password of the drive so let's first paste it then we need to 00:43:46.640 |
replace this one with the ip address and the network share name which i'll show you how to 00:43:52.240 |
find we go here drives and we can see here the address so we do but we need to replace 00:44:05.440 |
the escape character with the slash slash okay and the network drive username is this one 00:44:19.040 |
let's run this command the password is also here we can copy it 00:44:31.840 |
et voila now it should be mounted the first thing we do is we clone our repository so we are in the 00:44:38.240 |
home directory of the default user so paper space 00:44:43.280 |
we can clone it directly here no problem we then cd 00:44:57.840 |
okay now we have installed the requirements so now we can log in into weights and biases 00:45:03.280 |
using this command here but remember to copy the key from the website of weights and biases 00:45:14.560 |
and it should be okay okay now we are ready to run the training command on the first computer 00:45:26.560 |
but we need to also prepare the second one so let me prepare the second one 00:45:30.000 |
of course in a real scenario you would create a docker file and you would run everything using 00:45:39.360 |
kubernetes so it should be done automatically but in this case because most of you maybe are not 00:45:45.200 |
familiar with kubernetes or docker or you just want to run very fast your training to see how it works 00:45:51.280 |
so i recommend using paper space because it's very easy to configure all the setup 00:46:41.120 |
so we clone the same repository on both computers and then we run the 00:46:48.080 |
same code for training but we will see later how to do it 00:47:07.200 |
and now we are ready to run the training on both computers 00:47:17.760 |
okay the command that you need to run to in order to run the training on both machine is the same 00:47:24.720 |
but we first need to take the ip address of the computer that we will choose so one of the two 00:47:30.160 |
computer will become the rendezvous master it means that all the communication will be managed 00:47:34.800 |
by that node and all the others will adapt now of course to make it more fail safe we need to create 00:47:43.360 |
we can use for example a dynamic ip that can be mapped to another machine in case the master 00:47:49.120 |
crashes so that we can restart the training using always the same ip but in this simple case i will 00:47:55.440 |
configure it by hand otherwise i need an orchestration tool and that will complicate 00:48:00.000 |
all the scenario so in this case i just want to show you how to do distributed training i don't 00:48:04.400 |
want to spend too much time creating the perfect infrastructure which you would ideally do in a 00:48:09.680 |
real scenario so we take this command here and as you can see in this command we need to tell 00:48:16.960 |
the ip address of the master node here so how which one we will choose the master in my case 00:48:22.400 |
i will choose cuda0 so the first machine i have created and the other one will be kind of the 00:48:26.400 |
slave even if they both perform the same operation so we find the ip of this slave here which is this 00:48:35.120 |
one and we put it in the host file of the master and also i need the host name of the slave which 00:48:42.000 |
is this one perfect so okay okay so we need to paste here the ip address of this node 00:48:59.200 |
and that's pretty much it now we can start the training so 00:49:06.400 |
we take the ip address of the master now so cuda0 is our master which is this one 00:49:12.640 |
and we put it in the command in this position here and we cd to the torch and we can run the 00:49:22.240 |
command so in this command what we are saying is torch run is a special command that will create 00:49:27.280 |
the cluster and will manage all the cluster creation and communication it will run our 00:49:32.000 |
training loop you can see here first of all we need to tell how many process we have in this 00:49:38.400 |
cluster so we have two computers and how many process we want to create for each node so how 00:49:43.600 |
many compute how many gpus we have for each computer so this is the number of gpus per 00:49:48.320 |
computer and this is the number of computers so we have two and two this is a unique id that 00:49:53.360 |
indicates this particular cluster it should be unique for each cluster and this is the back-end 00:49:59.280 |
library that is managing the communication for us all the parameters after the file name that 00:50:04.000 |
we will use for training are the arguments that are passed to this file here so in our case we 00:50:09.200 |
are running with a batch size of 8 we are also telling him that the model folder so where the 00:50:14.000 |
checkpoint should be saved are this is the shared folder that we created before so the mount file 00:50:20.160 |
of the shared network drive we run the same command on both computers and this will start 00:50:25.280 |
the training so we do it this one and also on the other one as you can see this computer here 00:50:35.520 |
is not proceeding it's waiting for the other so now we run also it here and now it should proceed 00:50:47.840 |
yeah so now both are proceeding oops i forgot to set the host file on this computer here 00:50:57.840 |
so i retrieved the ip of this one so ip ip of this one and also the host name of this one 00:51:05.680 |
and we put in the host file of the other computer so 00:51:46.320 |
looks like it's proceeding so they are both doing yeah they are both building the data set now the 00:52:00.720 |
tokenizer so if you have watched my previous video on how to code a transformer model from zero this 00:52:05.760 |
is exactly the same code except that we i have added some things to manage the distributed training 00:52:11.440 |
but it's very minimal code change and later i will show you step by step how to how i done so as you 00:52:16.160 |
can see now the training is running in parallel the first thing you will notice is that the weights 00:52:20.720 |
and biases is only initialized on one node because we cannot send the metrics from multiple nodes 00:52:26.160 |
because it will interfere with each other so we send the metrics only from one node and it's the 00:52:31.360 |
node with rank number zero we will see later how we check this information as you can see they are 00:52:36.640 |
both training on a subset of the data so this one is training on 910 batch and this one also 910 00:52:42.000 |
batch it means that in total we have 1820 batches in total and each one is calculating a local 00:52:50.880 |
gradient and sending it to the other who will calculate the sum of this gradient actually 00:52:56.400 |
we have four nodes here because we have four gpu so each gpu is calculating a local gradient 00:53:00.880 |
and each each gradient is sent to the rank number zero which will calculate the sum of all these 00:53:11.360 |
gradients using the reduce operation actually the old reduce operation because then it will send 00:53:16.640 |
back the sum to all the other nodes who will then update the parameters using the sum of these 00:53:22.320 |
gradients that it has received another thing i made a mistake is that this 910 is not multiplied 00:53:28.240 |
by 2 because it's not the the rank 2 is actually later we will see what is the difference between 00:53:33.280 |
local rank and the global rank but we have four nodes each node is working on 910 batches of data 00:53:40.640 |
so i only show one because tqdm will otherwise if i show the tqdm for both gpus it will interfere 00:53:48.880 |
with each other this progress bar basically here so i only show one progress bar per computer 00:53:53.760 |
not two because each computer has two gpus so actually i should have two progress bar but 00:53:58.720 |
i only show one otherwise the visualization is very bad so first of all let's navigate 00:54:04.000 |
the code to understand how it works let me open the project 00:54:13.840 |
okay let's see here let's start from the train file 00:54:29.840 |
okay the main difference that we have compared to the original code that i built in my previous video 00:54:36.560 |
but it's the same this code with these changes that i'm showing here will apply to any training 00:54:41.440 |
loop that you have built so it's not only for this one this particular code this will apply 00:54:45.200 |
to anyone okay the first thing we do is we read the these two variables that are so when we run 00:54:52.800 |
the code with the torch run torch run will insert some environment variables into our environment 00:54:58.560 |
that we can read one is called rank and one is called local rank let's see the difference 00:55:03.200 |
okay the local rank basically indicates the number of the gpu in the local computer while 00:55:12.560 |
the global rank or also called just rank indicates the number unique id of the gpu among all the 00:55:20.000 |
cluster so if we have four gpus the rank will be unique among all the cluster while the local rank 00:55:26.960 |
is not unique but it's unique to the local computer the local rank is useful for example 00:55:32.240 |
when we want to print for example only on one gpu per each computer while the global rank is useful 00:55:38.800 |
when we want to only one gpu to perform an operation among all the others for example if 00:55:45.360 |
we want to initialize weights and biases or any other service that should be initialized only 00:55:49.600 |
from one node in all the cluster then we use the global rank which is this environment variable 00:55:55.360 |
here while if we want to use something for example for printing or only one local gpu should use the 00:56:02.160 |
tqdm or the progress bar or any other stuff that is can interfere with each other on the local 00:56:07.760 |
computer then we use the local rank so the first thing i do is i load these two environment 00:56:13.200 |
variables and save it into the configuration the second thing i do okay i print the configuration 00:56:18.560 |
this first thing we need to do to initialize the cluster and this is where the torch run will stop 00:56:25.200 |
waiting for all the nodes to connect is to call this function here init process group 00:56:29.360 |
this init process group belongs to a package that i imported here called torch.distributed 00:56:36.960 |
so these are the imports that we need to make in order to use distributed training so torch.utils.data 00:56:43.440 |
distributed we need this distributed sampler this distributed data parallel this init process 00:56:48.640 |
group and destroy process group so this is the first thing we do so we read the environment 00:56:53.360 |
variables we save it somewhere so we can access the local and the global rank we call this function 00:56:58.640 |
here indicating the backend that we want to use i am using cuda so i want to use nickel so which is 00:57:04.720 |
nccl we can also use the local rank to tell cuda which is the gpu we will be doing training upon 00:57:11.360 |
so each computer will have the will be given each gpu will be given its local rank with relative to 00:57:17.360 |
the current computer so the first gpu will be assigned local rank zero and the second gpu will 00:57:23.120 |
be assigned the local rank one on each computer so we set here then we run the training loop and 00:57:29.360 |
after the training loop has finished we do destroy process group let's see the detail of this training 00:57:34.080 |
group so when we do the training loop first of all here i disable the training on the cpu because 00:57:40.240 |
because we are using a distributed data parallel so it only works with the backend the nickel it 00:57:44.560 |
only works with cuda so i'm disabling on the cpu the second thing we do is when we create the 00:57:51.920 |
data loader so here so the train data loader and the validation data data loader we need to disable 00:57:58.960 |
the shuffling on the data loader and introduce this parameter sampler sampler and we need to 00:58:04.640 |
pass an instance of the distributed sampler here in which we tell him what is the data set and we 00:58:10.560 |
want to shuffle it so we shuffle it using distributed sampler not the data loader here 00:58:17.120 |
and the next thing we do is we create the code the logic for pre-loading the checkpoint so we 00:58:23.920 |
check if we want to preload the checkpoint in my case the default configuration says that 00:58:28.160 |
the we should always load the latest checkpoint you can see it here so the configuration the 00:58:32.960 |
default configuration says that we should load the pre-trained the the latest checkpoint available 00:58:38.560 |
so the latest checkpoint file is retrieved using the path that we passed using the command so 00:58:45.200 |
remember here we have a path that indicates what is the directory where we save the checkpoints and 00:58:52.240 |
we use this one if there is a latest checkpoint this will be loaded in our model so our model 00:58:59.440 |
is created here so we create an instance of our model which is just basically where you get the 00:59:06.880 |
instance of your model and then you can preload some the state dict here and also the optimizer 00:59:14.000 |
state dict and all the other global variables that should be pre-loaded for for resume the training 00:59:20.640 |
if the global config rank is zero we also initialize the services like weights and bias 00:59:28.240 |
you know in the case of weights and bias we also may want to resume the training from the same run 00:59:32.640 |
in which we last crashed so every time we save the checkpoint we save also the ran the run id of the 00:59:38.080 |
weights and bias run and we can restore it if we restore the checkpoint the big part that introduces 00:59:47.920 |
the distributed parallel training is this so now we have our model which could be anything in the 00:59:52.960 |
case of my code is the transformer model so it's a special class that i created in my previous video 00:59:58.560 |
this one here it's just nn.model this one here we need to wrap it in an instance of distributed 01:00:06.000 |
data model you can see here and we also indicate what is the device id we will be using so the gpu 01:00:11.520 |
id which is the local rank we also okay with the global rank zero we can also initialize other 01:00:18.480 |
stuff this in this case is just weights and bias the training loop is the same as the non-parallel 01:00:24.800 |
code except that instead of running directly the model so if we before do model.forward 01:00:33.680 |
we need to do model.module.forward so if we need to access the encode method of our model which is 01:00:40.880 |
for example this method here we cannot access model.encode we need to access model.module.encode 01:00:48.080 |
because this model refers to an instance of distributed data parallel if we want to retrieve 01:00:55.760 |
the original model we need to do model.module and every time we run loss.backward on this module 01:01:03.520 |
PyTorch will intercept this and will calculate the gradient the local gradient and after it has 01:01:10.480 |
calculated the local gradient it will send it to all the other nodes for accumulation and then 01:01:14.960 |
receive back the accumulated gradient and then this up this cumulative gradient is used when 01:01:21.120 |
we use optimizer.step and then optimizer.0 will set it to reset it to zero when we want to save 01:01:30.400 |
the checkpoint which for example we can do every epoch in my case i do it every epoch i save it 01:01:37.920 |
only on the global rank number zero because i don't want to save the checkpoint on all the nodes 01:01:43.440 |
otherwise they will be overwriting each other's checkpoint so i only do it on the global rank zero 01:01:48.640 |
as you can see it here and this is pretty much what we need to do i let me show you a template 01:01:56.080 |
that you can follow for your future projects on how to integrate the distributed data parallel 01:02:00.640 |
in your existing code so let's see okay this is a template that you should follow for your future 01:02:08.560 |
vid for future projects the first thing you do is you read these two environment variables called 01:02:13.680 |
the local rank and rank the one the first one indicates the local rank so the number we saw 01:02:18.720 |
before so the number of the gpu relative to the local computer not to the entire cluster the 01:02:24.000 |
second one indicates the global rank which is a unique identifier for this particular gpu among 01:02:29.120 |
all the gpus of the cluster we then call the function init process group we indicated the 01:02:33.920 |
backend we want to use for in case of cuda which i think is most of the cases is nickel because 01:02:38.960 |
it's the best one and it's made by nvidia so the second thing we do is we set which device 01:02:44.720 |
cuda should use which is the local rank then we run the training loop the training loop only on 01:02:50.640 |
the global rank number zero we initialize the services like weights and biases we create our 01:02:57.440 |
data loader by telling him that as a sampler it should use the distributed sampler here and using 01:03:02.960 |
shuffle but not shuffle here on the data loader we create an instance of our model which is our 01:03:08.720 |
custom class indicating our model and if there is a latest checkpoint we can load it now after we 01:03:15.280 |
have loaded the latest checkpoint we need to wrap our model into an instance of distributed data 01:03:20.560 |
parallel by also indicating the device id it should use which is the local rank all the rest 01:03:26.320 |
is the same except that on the global rank number zero we can collect some statistics so for example 01:03:31.760 |
we can send some statistics to weights and biases and only on the global rank number zero we save 01:03:38.080 |
the state of the model every epoch for example so that if the training crashes we can resume from 01:03:44.960 |
the latest checkpoint let's try to simulate a crash so let's go here let's see how is the training 01:03:53.200 |
going so as you can see now we are already training on the epoch number one so one epoch is already 01:03:58.640 |
done and our code should be in such a way that as you can see the code i built here we are running 01:04:05.440 |
validation only on the global rank number zero we actually you don't have to run validation during 01:04:13.120 |
this distributed training you can do it asynchronously creating another for example node 01:04:18.400 |
that reads the latest checkpoint and runs the validation asynchronously so that our gpu can 01:04:24.960 |
work directly always on the training and not wasting time for validation because otherwise 01:04:31.840 |
all the nodes are waiting for the node number zero to run validation and this can be expensive 01:04:37.760 |
for bigger models so we can do it asynchronously okay now we can see that the node number zero 01:04:46.560 |
the all the nodes are training the epoch number one as you can see here so i will try to simulate 01:04:51.920 |
a crash and and see that the the distributed training will resume from the checkpoint of the 01:04:58.960 |
epoch number zero that we have already completed so let's make it crash i will crash the master 01:05:06.560 |
node directly so we can try the worst case scenario this one has crashed so i just killed 01:05:13.280 |
the node and we see that this one should also be killed after a while 01:05:17.360 |
okay this one is also killed we can restart using the same command as before 01:05:38.800 |
loading the tokenizer and preloading the model as you can see here the model that was previously 01:05:47.600 |
saved by the global rank number zero so now they should resume the 01:05:51.440 |
training from the first epoch not from the zeroth epoch 01:06:00.400 |
and yes they are starting from the epoch number zero one so because the epoch number zero zero 01:06:05.200 |
has already been done and this is how failover works so you basically save a checkpoint every 01:06:11.600 |
once in a while then you can restart the training from that checkpoint when you save a checkpoint 01:06:16.880 |
you should also save all the variables that are relevant to resuming the training so if you have 01:06:22.000 |
some state variables like for example a global step counter or you have some other counters that 01:06:28.000 |
you need to keep track of you can save them in the checkpoint and then restore them when you 01:06:31.680 |
load the checkpoint okay let's stop the training here you can try to experiment with yourself 01:06:41.440 |
with the paper space i think it's quite easy and let me know if you have any problem i will try to 01:06:45.520 |
help you okay now let's look at how distributed parallel training so distributed data parallel 01:06:51.920 |
was integrated into pytorch so because there are some clever design choices into this to make it 01:06:57.680 |
very fast let's review them so first of all when does pytorch synchronize the gradients well this 01:07:04.160 |
happens when we call loss.backward because this when we call loss.backward this will lead to each 01:07:10.240 |
node calculating its local gradient which is the derivative of the loss function with respect to 01:07:15.280 |
each node in the computation graph and each node will send its local gradient to a central node 01:07:21.200 |
which is the global rank number zero and receives back the cumulative gradient through an operation 01:07:26.240 |
that is all reduce each node will then update its weights so the parameters of the model using the 01:07:32.560 |
cumulative gradient and of course the learning rate of the local optimizer we can avoid pytorch 01:07:39.120 |
synchronizing the gradient at every backward step and instead let it accumulate it for a few steps 01:07:44.800 |
for example for a few batches using the no sync context let's see how to integrate it so usually 01:07:51.760 |
you run lost the forward and forward step then you run the backward step this will lead to 01:07:58.800 |
synchronization then you do optimizer.step and then optimizer.zerograd however if we want to 01:08:06.240 |
do it every few epochs we don't want to run the synchronization at every step we can use the no 01:08:12.400 |
sync context you can see here which is a method provided by distributed data parallel which is 01:08:18.080 |
the wrapper of our model and basically we create this context manager this we run the forward step 01:08:24.640 |
and the backward step and we don't call optimizer.step when we are outside of this context 01:08:31.040 |
we still need to do the forward and the backward step then we can run optimizer.step and zerograd 01:08:35.920 |
basically this no sync will disable the synchronization of the gradient but it will 01:08:40.880 |
let it accumulate locally for a while and after a while we can we can synchronize also there is 01:08:49.360 |
another very clever trick in pytorch which is the computation communication overlap so since 01:08:55.200 |
each gpu needs to send its gradient to a central node for accumulation this can lead to an idle 01:09:01.680 |
time in which the cpu gpus are not working because they're communicating right so each gpu will 01:09:06.400 |
perform the forward step then it will perform the backward step and then there is some communication 01:09:11.680 |
overhead because then gpus need to synchronize the gradients with each other calculate the 01:09:20.000 |
cumulative on one node and receive back the cumulative one if we do this operation sequentially 01:09:26.080 |
it will lead to a big delay in the gpu because this time they could do some other meaningful work 01:09:31.520 |
for the training while so what pytorch does it will overlap the communication 01:09:38.720 |
while running the backward step let's see how it works so as you remember we have a computation 01:09:46.800 |
graph right so pytorch will communicate the gradient of a node as soon as it is available 01:09:51.920 |
because as you remember we calculate the gradient of the loss function with respect to each weight 01:09:56.960 |
but to calculate the gradient of the loss function with respect to each weight we need to calculate 01:10:00.800 |
the gradient of the loss function with respect to the intermediate nodes so for example first we 01:10:05.440 |
compute the gradient of the loss function with respect to the output then we compute it of the 01:10:11.360 |
loss function with respect to the layer the weights of this layer here which are the weights and the 01:10:16.480 |
bias but these are already available so we can already send them to the other nodes and get back 01:10:22.640 |
the cumulative one so this can already be done as already this operation then pytorch will calculate 01:10:28.880 |
the next layer and this one since it's already available we can already send it to the other 01:10:33.760 |
nodes and receive back the cumulative one etc etc etc so while pytorch is computing the backward 01:10:39.840 |
step we can already communicate to the other nodes the gradient of the that we have already computed 01:10:45.360 |
and receive back the cumulative one this will result in a very fast speed up and to make the 01:10:52.800 |
communication even faster instead of sending one gradient at a time pytorch basically will create 01:10:58.560 |
buckets of gradients so every time a bucket is available it sends it to the other node and 01:11:03.200 |
receives back the cumulative gradient for that bucket then after the second bucket is available 01:11:08.400 |
it will send the second bucket and receive back the cumulative if we after the last bucket is 01:11:13.120 |
available it will send it to the other nodes and receive back this way we can as you saw before 01:11:20.800 |
we can overlap the communication overhead with the calculation of the backward step so while we are 01:11:27.920 |
calculating the gradients we already send them so that the total time to process the forward the 01:11:34.560 |
backward the communication and the updating it's less because one two steps are overlapped with 01:11:39.920 |
each other pytorch recommends 25 megabyte as the size of the bucket because we don't want it to be 01:11:46.800 |
too small because the overhead of the communication of each bucket would be too big so we have a lot 01:11:52.000 |
of overhead for each bucket so we can spread this overhead on bigger buckets so less in number but 01:11:59.360 |
bigger but we also don't want it to be too big because otherwise the we are we are not using the 01:12:07.440 |
communication channel while we are computing the gradient so thank you guys for watching my video 01:12:13.600 |
i hope you learned a lot in this video i introduced the data parallel training and i show you how to 01:12:18.960 |
create the first the infrastructure and also how to run this code of distributed training on the 01:12:24.320 |
infrastructure that we have created i also show you how pytorch works under the hood so that you 01:12:30.560 |
can understand how the gradients is synchronized and also at a mathematical level how does it work 01:12:36.400 |
please like and subscribe my video if you liked it and it was hopeful helpful for you i also 01:12:42.240 |
recommend watching my other videos because i every time i make long videos in which i give a lot of 01:12:47.200 |
knowledge and let me know if there is something you don't understand i will try to help all of you