back to index

Distributed Training with PyTorch: complete tutorial with cloud infrastructure and code


Chapters

0:0 Introduction
2:43 What is distributed training?
4:44 Data Parallelism vs Model Parallelism
6:25 Gradient accumulation
19:38 Distributed Data Parallel
26:24 Collective Communication Primitives
28:39 Broadcast operator
30:28 Reduce operator
32:39 All-Reduce
33:20 Failover
36:14 Creating the cluster (Paperspace)
49:0 Distributed Training with TorchRun
54:57 LOCAL RANK vs GLOBAL RANK
56:5 Code walkthrough
66:47 No_Sync context
68:48 Computation-Communication overlap
70:50 Bucketing
72:11 Conclusion

Whisper Transcript | Transcript Only Page

00:00:00.000 | Hello guys, welcome back to my channel. Today we are going to talk about the distributed training
00:00:04.080 | of a model with PyTorch. What do I mean? Well imagine you have a model that is very big and
00:00:09.120 | you have a lot of data upon which you want to train this model but it doesn't, it takes a lot
00:00:13.520 | of time to train it because maybe your GPU is not so powerful or maybe you cannot use a large batch
00:00:19.840 | size or just you have a lot of data to train this model and you want to train it in a reasonable
00:00:23.840 | time. One way to do it is to parallelize the training on using multiple GPUs or even multiple
00:00:29.200 | computers, each one having multiple GPUs or even one GPU. In this video I will show you how this
00:00:35.200 | distributed training works, how to integrate it in your existing project and we will combine theory
00:00:41.280 | with practice which means that I will give you all the theory behind the distributed training and
00:00:45.280 | then also show you how to first of all build the cloud infrastructure to do cloud training of a
00:00:51.920 | model and also how to integrate the distributed training into an existing model that you have
00:00:56.800 | already built and it's maybe already training on a single GPU. So let's review the topics of today.
00:01:02.160 | First of all I will give you an introduction to distributed training and show you the two
00:01:05.760 | ways in which we can do it. One is data parallelism and one in model parallelism. In this
00:01:10.080 | video I will explore data parallelism. We will see a little bit of the neural networks theory
00:01:15.520 | because I want to introduce a concept called gradient accumulation which is very important
00:01:19.760 | for distributed training and later we will see distributed training as it is built in PyTorch
00:01:25.040 | so distributed data parallel. We will see how it works and I will also show you what we mean by
00:01:31.200 | communication primitives because maybe you have heard terms like all reduce, reduce broadcast,
00:01:36.080 | all gather etc but you maybe don't know what they are or how they work. In this video I will show
00:01:41.040 | you the algorithms upon which these operations are built and how they work when we talk about
00:01:46.000 | distributed training. I will also show you how to manage failover of a node so imagine you are
00:01:50.400 | training on multiple nodes and one of the nodes suddenly dies how we manage this situation. We
00:01:55.840 | will do a little coding session I will not write code I will just take an existing code and show
00:02:00.000 | you how to add the distributed training to an existing model and I will be using a model that
00:02:05.200 | I built in a previous video so it's the video about the how to code a transformer model from
00:02:10.240 | scratch so if you have already watched my previous video on how to do it it's wonderful if you didn't
00:02:14.960 | it's not a problem because anyway the knowledge I will give you today applies to any model that
00:02:20.000 | you have built not only the one wrote by me and I will also show you the underlying mechanism in
00:02:26.800 | which PyTorch does distributed training so I will introduce you the computation communication
00:02:32.320 | overlap and the bucketing which are optimizations made in the PyTorch implementation of the
00:02:37.920 | distributed training and so we will explore all these topics. Let's start our journey so let's
00:02:43.840 | start by introducing what is distributed training. Imagine you want to train a language model on a
00:02:49.280 | very big data set for example the entire content of wikipedia now the data set is quite big because
00:02:54.640 | it's made up of millions of articles and each article has maybe thousands of tokens to train
00:03:00.560 | this model on a single gpu may be possible but it poses some challenges first of all the model may
00:03:06.240 | not fit on a single gpu this happens when the model has many parameters this happens for example
00:03:11.600 | also with the latest llama they have billions of parameters and with so many parameters the ram of
00:03:18.160 | the gpu may not be enough or you are forced to use a small batch size because a bigger batch size
00:03:24.640 | leads to the out of error out of memory error on CUDA or the model may take years to train because
00:03:31.360 | the data set is very big if any of the above points applies to you then you need to scale up
00:03:36.400 | your training setup and scaling can be done vertically or horizontally let's compare the two
00:03:41.200 | options. Vertical scaling means that you have a single machine for example a single gpu which has
00:03:47.520 | some ram and the gpu has some memory for example four gigabyte vertical scaling means that you
00:03:52.880 | just take the you just buy a bigger computer and or a bigger gpu so you take the existing machine
00:03:59.920 | and you upgrade the hardware and this requires no code change because the code that was running on
00:04:05.680 | the small machine can also run on the big machine you just maybe need to adjust a little bit of the
00:04:09.440 | parameters maybe you can increase the batch size now with horizontal scaling we have we go from a
00:04:15.680 | single machine to multiple machines each one interconnected with each other that are communicating
00:04:20.800 | for this parallelization training and each machine may have one or multiple gpus and this requires
00:04:28.720 | code change but this code is mean this code change is minimal thanks to pytorch and its
00:04:33.120 | implementation of the distributed data parallel and in this video we will explore horizontal
00:04:38.800 | scaling because vertical scaling basically we don't need to do anything the code there is no
00:04:43.040 | code change so there are two ways of distributing training of a model one is called the data
00:04:50.080 | parallelism and one is called the model parallelism so if the model can fit within a single gpu then
00:04:55.520 | we can distribute the training on multiple servers each one having one or more gpus with each gpus
00:05:02.400 | processing a subset of the entire data in parallel and synchronizing the gradients during back
00:05:08.160 | propagation this technique is known as data parallelism so we have one model that can fit
00:05:14.400 | within a single gpu and we have a lot of data what we do is basically we split this data into subsets
00:05:20.480 | and we train it on multiple computers in parallel such that each gpu is training on a subset of this
00:05:27.520 | entire data set if the model however cannot fit within a single gpu then we need to break the
00:05:34.400 | model into smaller pieces into smaller layers and each gpu process a part of the forward and
00:05:40.640 | the backward step during gradient descent this option is known as model parallelism so imagine
00:05:46.160 | this model initial model doesn't fit on a single gpu so we can break it into layers and each
00:05:51.600 | computer is managing a layer not the entire model and of course we can also combine data parallelism
00:05:59.040 | with model parallelism to create a hybrid option in this video we will focus on data parallelism
00:06:05.200 | so we pretend that we have a model that is complex but it can fit on a single gpu but we have a lot
00:06:11.520 | of training data and it's taking really forever to train it on a single computer so we want to
00:06:16.320 | distribute this training on multiple computers or multiple gpus such that each gpu will train
00:06:22.480 | on a subset of the data okay we need to review a little bit of the neural networks because i need
00:06:28.960 | to introduce a concept called gradient accumulation okay imagine you want to train a neural network
00:06:35.440 | to predict the price of a house given two variables the number of bedroom in the house and
00:06:41.200 | we will call this variable x1 and the number of bathrooms in the house and we will call this
00:06:45.920 | variable x2 we think because of some intuition that the relationship between the input and the
00:06:52.880 | output variable is linear so we can write that the predicted price is equal to x1 multiplied by the
00:06:59.280 | first weight plus x2 multiplied by the second weight plus a bias term our goal is to use
00:07:06.080 | stochastic gradient descent to find the values of the parameters w1 and w2 and bias such that
00:07:12.880 | the mean squared error loss between the actual house price and the predicted house price is
00:07:18.240 | minimized in other words we want to find w1 w2 and b such that we minimize this quantity here
00:07:25.600 | how do we do a training loop with pytorch without gradient accumulation so first of all we run the
00:07:34.720 | training for a few epochs we take our training data which is made up of x1 x2 and the target
00:07:41.120 | price we calculate the output of the model which is basically y pred is equal to x1 multiplied by
00:07:47.360 | w1 plus x2 multiplied by w2 plus the bias then we calculate the loss we do backward on the loss
00:07:55.120 | this will calculate the gradient of the loss function with respect to each parameter and then
00:08:00.000 | we update the parameters using the gradient we have calculated i am not using the optimizer i'm
00:08:06.320 | just writing the the update rule by hand so using a learning rate i don't use the momentum just for
00:08:12.880 | simplicity and this is actually how the training loop works right so this is this part here is
00:08:18.560 | equivalent to calling optimizer.snap and this part here is equivalent to calling optimizer.zero
00:08:23.920 | let's see it graphically what happens when we do a training loop like this so imagine we have our
00:08:30.160 | expression which represents the model which could in this case it's a very simple expression but
00:08:34.560 | imagine it's a very complex model okay what pytorch will do it will create a computation
00:08:40.320 | graph so it will take our input it will multiply it by the w parameters each input with its own
00:08:48.080 | weight then it will combine the sum of the two plus the bias term this will produce an output
00:08:54.560 | then we have a target the target is compared with the output to produce a loss and then we
00:09:01.680 | run back propagation to calculate the gradient of the loss function with respect to the target so
00:09:07.360 | let's visualize the training process one item at a time using our computation graph
00:09:11.920 | imagine we are training on the input x1 is equal to 6 x2 is equal to 2 and the target is 15 the
00:09:20.800 | first thing our model will do is will start from the two input nodes so x1 and x2 it will multiply
00:09:26.400 | by the w weights and we also think that the w weights have been initialized with the following
00:09:32.560 | value so the value of this weight is 0.773 this weight is 0.321 and this weight is 0.067
00:09:40.240 | these values could be randomly generated as we usually do actually pytorch will produce
00:09:48.240 | the output of this node which is multiplying the x1 value with w1 then it will combine the two
00:09:55.200 | producing the output of the model it will compare it with the target to produce a loss which is a
00:10:00.560 | number but it's also a function so we can calculate the gradient so now usually we call a loss dot
00:10:06.880 | backward to calculate the gradient of the loss function with respect to each parameter so w1 w2
00:10:12.800 | and b pytorch will also compute the intermediate nodes but i will not show them here for simplicity
00:10:18.640 | so we run loss dot backward what will loss dot backward do it will calculate the gradient
00:10:24.960 | of the loss function with respect to each weight how does it do it well the first thing we do it
00:10:31.600 | will do it will calculate the gradient of the loss function with respect to the output which can be
00:10:36.400 | done like this then it will calculate the gradient of the output with respect to this node i here i
00:10:44.880 | am showing only the computation of the gradient of the loss function with respect to the w1
00:10:51.280 | node you can do the other ones for exercise for example so next it will compute the gradient
00:10:58.320 | of the loss function with respect to the z1 node but in order to do it because of the chain rule
00:11:03.760 | we need the gradient of the output with respect to the z1 node and then it will compute the output
00:11:10.320 | the gradient of the z1 node with respect to w1 and this allow us to calculate the gradient of
00:11:16.160 | the loss function with respect to w1 these are all the calculations that you need to do in order to
00:11:21.200 | gather the gradient of this node here and as you can see we also need to compute the gradient of
00:11:26.560 | the intermediate node but we are only interested in the in order to arrive to the gradient of the
00:11:31.840 | parameters of the loss function with respect to the parameters and these are the values for
00:11:36.960 | the gradient that i that we have calculated for each parameter the next thing we do in during
00:11:43.680 | training is we run optimizer dot step this will update the value of each parameter using the
00:11:50.240 | gradient that we have calculated and how do we do it we say that the new value of the parameter is
00:11:54.960 | equal to the old value of the parameter minus learning rate multiplied by the gradient why
00:12:02.240 | minus because the gradient indicates the direction in which the function grows the most but we want
00:12:07.600 | to move against this direction because we want to minimize the the loss so we put a minus sign here
00:12:14.560 | and this is how the value will be updated using the gradient that we have calculated in the
00:12:20.000 | previous step which was the loss dot backward function the next thing we do is we run optimizer
00:12:27.360 | dot zero so this will zero out all the gradients of each parameter and also the intermediate nodes
00:12:33.600 | and then we do the next iteration on the next item so imagine we are training one item at a
00:12:40.160 | time so batch size is one the next item may have x1 is equal to 5 x2 is equal to 2 and the target
00:12:46.880 | may be 12 this will produce this loss and this output the next thing we do is we run loss dot
00:12:53.440 | backward loss dot backward we calculate the gradient of the loss function with respect to
00:12:58.640 | each weight and you can see the gradient here the next thing we do is we run optimizer dot step which
00:13:04.960 | will update the value of the parameter using the gradient that we calculated in the previous step
00:13:09.520 | and the learning rate and finally we run optimizer dot zero and this will reset all the gradients
00:13:15.360 | for all the weights we may want to visualize this on this process on the loss function so let's see
00:13:22.080 | what happens to the loss function while we are doing this process we started some with some
00:13:27.440 | initial weights so remember we started with some randomly initialized weights values then we ran
00:13:33.280 | forward step on the first data item this calculated an output and then we run loss dot backward this
00:13:39.920 | resulted in a gradient being calculated for the loss function with respect to each weight
00:13:44.480 | this indicates a direction in which we should move our weights in order to minimize the loss
00:13:50.720 | because we move against the direction of the gradient so the arrow is pointing already in
00:13:55.200 | the negative direction of the gradient the next thing we do is we do optimizer dot step optimizer
00:14:01.440 | dot step will take our initial weights and will change the weights value according to the
00:14:07.680 | negative direction of the gradient we then run optimizer dot zero this just resets the value of
00:14:14.080 | the gradient and then we run forward on the next data item this will then we run loss dot backward
00:14:20.480 | and this will result in a calculation of a gradient which indicates a direction in which
00:14:24.320 | we need to move our weights in order to minimize the loss function and then we actually modify our
00:14:30.240 | weights using optimizer dot step so the actual update of the parameters values happens when we
00:14:36.800 | call optimizer dot step and finally we run optimizer dot zero and this will reset the
00:14:43.600 | gradient to zero and this is how gradient descent works without accumulation and without without
00:14:51.600 | gradient accumulation at every step so every data item because here we pretend we have a batch size
00:14:56.240 | equal to one we update the parameters of the model at every step and however with gradient
00:15:02.720 | accumulation we don't do at every step let's see how it works so the the initial part of the
00:15:08.480 | training loop with gradient accumulation is the same so we have run for a few epochs we extract
00:15:12.880 | the data from our training data so x1 x2 and target we calculated the loss just like before
00:15:18.720 | we run loss dot backward just like before the difference is here that we don't update the value
00:15:25.200 | of our parameter at every item we train our model upon or every batch we train our model upon but we
00:15:31.840 | do it at every few items or every few batches in this case i do it in this training loop we do it
00:15:39.840 | every two items because we are extracting one item at a time from our training data and so we update
00:15:48.880 | the the parameters using the gradient every two items in this case and what happens is that when
00:15:56.960 | the first item for example we will arrive to loss dot backward we will not run this code here we
00:16:02.000 | will restart the loop at the next item we will calculate the output we run again loss dot
00:16:07.280 | backward and this loss will produce this loss dot backward will create a gradient but this gradient
00:16:12.960 | will not replace the previous the gradient that we calculated in the previous step but it will be
00:16:18.240 | accumulated it will summed up to the previous gradient so let's visualize this process step
00:16:23.280 | by step imagine we have our first item and this is x1 is equal to 6 x2 is equal to 2 and the target
00:16:30.560 | is 15 we run the forward loop using this item this will result in an output being calculated
00:16:37.360 | and using the target we can calculate a loss we then run the loss dot backward this will result
00:16:43.200 | in a gradient being calculated for this item then we don't update the parameters we do forward step
00:16:50.720 | on the next item so note that during this forward step the gradient is not zero because we didn't
00:16:56.480 | zero it out in the previous step because in the previous step we didn't run optimizer dot step
00:17:01.280 | or optimizer dot zero so the gradient is still there from the previous item and okay the second
00:17:08.560 | item which is x1 is equal to 5 x2 is equal to 2 and the target is 12 will result in a loss being
00:17:13.440 | calculated and an output we then run loss dot backward and the pytorch will not replace this
00:17:20.320 | new gradient with the previous one we'll sum it up it will be accumulated that's why we call it
00:17:25.760 | gradient accumulation so the new gradient is accumulated with the old one now that we have
00:17:31.360 | reached the batch size we can now finally call the optimizer dot step method this will result
00:17:37.920 | in the values of the parameter being updated using the accumulative gradient and then we
00:17:45.760 | run optimizer dot zero which will reset the gradient to zero and then we can proceed with
00:17:50.640 | another loop of two items etc let's visualize what happens to our loss function so to our
00:17:58.000 | parameters and the loss function when we do gradient descent with gradient accumulation
00:18:02.640 | so our initial weights are here because they are randomly initialized we run the forward loop on
00:18:09.840 | the first item this will result in an output being calculated by the model then we run loss
00:18:17.520 | loss dot backward this will result in the gradient being calculated with respect to the first data
00:18:23.200 | item and this gradient will indicate a direction then we run again forward on the second data item
00:18:30.480 | and then we run loss dot backward in the second data item this also will result in a gradient
00:18:35.520 | being calculated but these two gradients will then be summed up by pytorch and this will result in a
00:18:41.760 | resulting vector that indicates a direction and then we run the optimizer so we do optimizer dot
00:18:49.040 | step which will update the values of our weights according to the direction indicated by the sum
00:18:55.440 | of the two gradients of the two previous data items and then we run optimizer dot zero this
00:19:01.040 | will result the gradients to zero for all the nodes and so with gradient accumulation we update
00:19:08.080 | the parameters of the model only after we accumulated the gradient of a batch and we
00:19:13.360 | can decide how much we want this batch to be so gradient accumulation is used not only distributed
00:19:19.680 | training it's also used for example when you want to accumulate the gradient without increasing the
00:19:24.160 | batch size because maybe a bigger batch size doesn't fit in your gpu so you can accumulate
00:19:29.120 | the gradient for more than one item and then move the parameters because this will result in a more
00:19:34.240 | smooth training the loss will oscillate less for example and now that we have seen how gradient
00:19:41.040 | accumulation works let's see distributed data parallel training in detail so how does it work
00:19:46.240 | what we mean by communication primitives and how do we manage the failover okay distributed data
00:19:53.040 | parallel training so imagine you have a training script that is running on a single computer but
00:19:58.720 | it's very slow because the data set is very big and you can't use a bigger batch size because it
00:20:03.680 | will result in an out of memory error on CUDA this distributed data parallel is the solution
00:20:09.360 | in this case it works in the following three scenarios so imagine you have a model that can
00:20:15.680 | fit on a single gpu and you have a training loop that works but it's very slow so the first thing
00:20:22.880 | you can do is you want to train the model on multiple servers each one having a single gpu
00:20:28.480 | and this can be done with distributed data parallel the second setup that you can use is
00:20:35.280 | you want to just increase the increase the number of gpus on your existing computer and this one
00:20:39.840 | also can be managed by distributed data parallel training the final setup that you can that you
00:20:45.280 | may want is you want multiple computers and each one having more than one gpu in this video i will
00:20:51.760 | show you how to implement this setup here because the other two setup are just a particular case of
00:20:58.240 | this one so this is the most generic setup we can have and i will show you how to create the
00:21:05.360 | cloud infrastructure for this setup and also how to run the to convert an existing code into a
00:21:11.120 | distributed training code and also run it on the cluster that we will create so
00:21:17.040 | from now on i will use the term node and gpu interchangeably if a cluster is made up of two
00:21:24.800 | computers and each computer has two gpus then we in total we will have four nodes or four gpus
00:21:30.880 | distributed data parallel works in the following way so at the beginning of the training the
00:21:36.480 | model's weights are initialized on one node so on one gpu and then send to all the other nodes
00:21:42.800 | using an operation called broadcast each node then trains the model the same model because
00:21:49.920 | it started from the same initial weights on a subset of the data so maybe this one is maybe
00:21:56.320 | one gpu is training it on the batch number one the second one is on the batch number two the third one
00:22:02.480 | the batch number three etc etc such that there is no overlap between this data and every few batches
00:22:09.200 | that these nodes get trained this model the gradients of each node are accumulated so summed
00:22:15.600 | up into one node and then sent the sum is sent back to all the other nodes this operation is
00:22:21.440 | known as all reduce then each node updates the parameters of its local model using the gradient
00:22:29.520 | received by the previous step so this the gradient receives are the previous step is the sum of the
00:22:34.400 | gradients of all nodes using also its own optimizer so doing optimizer dot step and then we start
00:22:41.600 | again from the step number two so we train it for a few batches we have some local gradient we send
00:22:47.280 | our local gradient to a central node which will sum it up sum it up and then send it back to all
00:22:52.320 | the nodes then all the nodes will update their parameters using this accumulated gradient and
00:22:58.480 | and then we continue again let's see all these steps one by one in detail so
00:23:07.200 | step number one our model's weight are initialized on one gpu so imagine we have a setup with four
00:23:14.480 | gpus we have one gpu that will initialize the weight and send it to all the others so now the
00:23:20.400 | value of the parameters this is the value of the parameter and this is the gradient so the value
00:23:25.760 | of the parameter imagine is 0.1 we only have one parameter so for simplicity the initial weights
00:23:31.680 | are sent to all the other nodes so now all the nodes have the same weights so all the nodes now
00:23:37.520 | have the same parameters the all the same value the next thing we do is each node runs a forward
00:23:46.000 | and backward step on one or more batch of data this will result in a local gradient being calculated
00:23:52.720 | because as we saw before when we run loss dot backward we we have a gradient being calculated
00:23:59.520 | of the loss function with respect to each parameter this local gradient of course may
00:24:05.120 | be different for each node because each one is training on a different subset of the data
00:24:09.600 | and this local gradient may also be the accumulation of one or more batch so it's
00:24:14.720 | imagine that this gpu is training on each gpu is training on three batch so we can accumulate
00:24:21.600 | the gradient for three batch and of course because we are training on a different subset of the data
00:24:26.320 | this cumulative gradient but still local is different for each node the next thing we do
00:24:32.640 | is we send all this gradient to every node sends its gradient to one single node and this node will
00:24:39.680 | calculate the sum of all the gradient it receives so imagine all the nodes decide to send their own
00:24:46.160 | gradient to the first gpu and the sum is calculated on the first gpu then this operation is known as
00:24:53.600 | reduce and later we will see how it works then the gpu that calculated the sum of all the gradients
00:25:01.600 | will send it back to all the other nodes in a in an operation that is known as broadcast
00:25:06.400 | and the sequence of reduce and broadcast because here i show it as a separate operation usually is
00:25:12.560 | implemented as a single operation known as all reduce and later we will see in detail how it
00:25:17.840 | works but this is the key idea so we have the models weights that are initialized on a model
00:25:24.320 | they are sent back to all the other models so now all the other gpus they they have the same
00:25:29.600 | weights initial weights they train it on a different subset of the data so each one has
00:25:34.240 | a local gradient they send each one its own local gradient to one node which will calculate the sum
00:25:39.680 | of all these gradients and send it back to all the other nodes so now they have all the same sum
00:25:44.880 | then they run optimization steps to update the weights using the same sum so they will produce
00:25:50.560 | the resulting weights of the models will be the same for all of them as you can see here so now
00:25:56.960 | each node will update the value of their own parameters using the same sum of the gradient
00:26:03.520 | you can see here so it's 0.3 they use 0.3 to update the value of their local parameters and
00:26:10.000 | they will end up with the same weights so each node updates the parameter of its local model
00:26:16.480 | using the gradient received after the updates the gradients are reset to 0 and then we can
00:26:21.920 | start with another loop now let's talk about collective communication primitives so the
00:26:27.680 | operation that we talk i showed before in distributed computing environments a node may
00:26:33.440 | need to communicate with other nodes if the communication pattern is similar to a client
00:26:38.000 | and the server then we talk about point-to-point communication because we have one client connects
00:26:43.760 | to one server in a request response chains of events however there are cases in which one node
00:26:50.080 | needs to communicate to multiple receivers at once for example in the broadcast scenario right
00:26:55.040 | we have one node that needs to send its weights to all the other nodes this is the typical case
00:27:01.280 | of data parallel training in deep learning so one node needs to send its initial weights to all the
00:27:06.640 | other nodes moreover all the other nodes need to send their gradient to one single node to receive
00:27:12.640 | the cumulative gradient back so collective communication allows to model the communication
00:27:18.480 | pattern between group of nodes let's visualize the difference between these two modes of communication
00:27:24.320 | imagine you need to send a file to seven friends with point-to-point communication you would send
00:27:30.800 | the file iteratively to each of your friend one by one suppose the internet speed that you have
00:27:36.480 | is one megabyte per second and the file is five megabyte in size what you do is first you send
00:27:43.040 | the imagine you are here so you have the file and you send it to your first friend and it will take
00:27:48.240 | five seconds because you are sending five megabyte with the speed of one megabyte per second then you
00:27:53.840 | send it to the second friend the third the fourth the fifth the sixth and the seventh in total the
00:27:58.960 | time to send the file to seven friends will be 35 seconds of course you may say okay but why
00:28:05.440 | not send the file simultaneously to all the seven friends at the same time of course you can do it
00:28:11.760 | but the problem is your internet speed is still one megabyte per second and the internet speed
00:28:18.400 | of sending the file simultaneously to seven friends will be split among these seven friends
00:28:22.960 | so each friend will be receiving the file at a speed of 143 kilobyte per second more or less
00:28:29.840 | so the total time is still 35 seconds to distribute your file to your seven friends
00:28:35.120 | let's see if there is a better way so with collective communication we introduce the
00:28:42.160 | first operator is known as broadcast so the operation of sending a data to all the other
00:28:46.880 | known although all the other nodes is known as the broadcast operator collective communication
00:28:52.000 | libraries like nccl which is pronounced nickel which is a library from nvidia assigns a unique
00:29:00.000 | id to each node and this node this unique id is known as rank suppose you want to send five
00:29:06.720 | megabyte with an internet speed of one megabyte per second and let's see how the collective
00:29:11.280 | communication would work in the case of this broadcast operation the first thing you do is
00:29:16.480 | you are here so you have the file you send it not to this friend here but to the rank number four
00:29:22.080 | at the next step we realize that the rank 0 and the rank 4 have the file so they can send
00:29:28.080 | simultaneously to another friend so rank 0 now will send it to rank 2 and rank 4 will send it
00:29:33.680 | to rank 6 but now we realize that rank 0 rank 2 rank 4 and rank 6 all have the file so they can
00:29:40.000 | simultaneously send to it another friend so in total it will take 15 seconds to distribute your
00:29:46.560 | initial file to all the seven friends compared to the 35 second with the point-to-point communication
00:29:52.560 | this approach is known as divide and conquer with collective communication we exploit the
00:29:58.960 | interconnectivity between nodes to avoid idle times and reduce the total communication time
00:30:05.440 | this is how also your gpu sends the initial weight to all the other nodes it's not one by one because
00:30:11.280 | that would be too slow not it's not even in parallel because anyway the connection speed
00:30:16.000 | is always the same it would be split among all the receivers what we do is we do this divide and
00:30:20.720 | conquer approach so our initial weights are sent using this broadcast operation in this manner here
00:30:26.160 | which is much faster let's see the reduce operation what is the reduce operation reduce operation
00:30:32.240 | means that we apply it during the training when we want to calculate the sum of the gradient of
00:30:38.320 | all the nodes so each node has a local gradient which was calculated using a subset of the data
00:30:43.600 | but they are all different from each other and what we want is to we want to calculate the sum
00:30:49.360 | of the all these gradients into one single node let's see how it works so the broadcast operator
00:30:54.720 | is used to send the initial weights to all the other nodes when we start the training loop
00:31:00.560 | at every few batches of data being processed by each node the gradients of all the node needs to
00:31:05.760 | be sent to one node and accumulated so summed up this operation is known as reduce let's see how it
00:31:11.280 | works so imagine initially each node has its own gradient because they are training on a subset of
00:31:17.520 | the data and these gradients is all different from each other what we can do is each node will send
00:31:24.240 | the gradient to his adjacent node so and the adjacent node will sum it up with the his own
00:31:30.640 | gradient so the node 7 for example will send his gradient to nodes number 6 and the node
00:31:36.160 | number 6 the receiver node will be responsible for calculating the sum and the same with the
00:31:41.440 | rank number 5 and 4 and 3 and 2 and 1 and 0 the next step we have the sum here at rank number 4
00:31:51.920 | rank number 2 and rank number 0 what we do is we send the rank the sum here from rank number 6 to
00:32:00.000 | rank number 4 which will calculate the sum of the sum and then also from rank number 2 to rank number
00:32:06.800 | 0 the rank number 0 will calculate the sum of the sum and then we do a final step in which we send
00:32:12.800 | the sum that was present at rank number 4 to the rank number 0 and this sum here is the sum of all
00:32:19.280 | the gradients of all nodes and in total it took us only three steps to do it so with only three
00:32:26.160 | steps we accumulated the gradient of all nodes into one single node and it can be proven that
00:32:31.920 | the communication time is logarithmic with respect to the number of nodes and this is very typical of
00:32:37.840 | all the divide and conquer approaches the all reduce operation so what we saw before is that
00:32:45.040 | we are first broadcasting our data so our initial weights then we are reducing the local gradients
00:32:52.160 | and then we need to send it back the sequence of reduce and broadcast is known as all reduce and
00:32:58.480 | is usually implemented as a single operation which is faster than doing the sequence of the two
00:33:03.760 | operations i will not show the algorithm behind all reduce but you can think of it as logically
00:33:09.840 | as a sequence of reduce and broadcast operation but remember that in practice it's implemented
00:33:15.840 | as a single operation and it's much faster than doing the two operations alone now imagine you
00:33:23.200 | are training your model on multiple gpus and one node crashes so imagine you are training in a
00:33:29.120 | distributed scenario like the one shown below and suddenly one node is crashed and in this case two
00:33:35.520 | gpus out of four become unavailable how should the system react well one way would be to restart the
00:33:43.040 | entire cluster and that's easy however by restarting the cluster the training would restart also from
00:33:48.720 | zero because as you remember we start from initial weights that are randomly chosen by one node which
00:33:53.440 | are then sent to all the other nodes but this would mean that we would lose all the parameters
00:33:58.800 | and computation that we have done so far when this node crashed a better approach is to use
00:34:04.320 | checkpointing so checkpointing means that we save the weights of a model on a shared disk every few
00:34:11.040 | iteration for example every epoch and then we resume the training from the last checkpoint
00:34:16.080 | in case there is a crash so remember the step in which we initialize the weights of the model in
00:34:23.680 | one node and then send it to all the others well instead of just initializing it randomly we just
00:34:28.480 | can initialize the weights of the model using the latest checkpoint available so that the training
00:34:33.360 | can continue from there we need a shared storage for saving the checkpoints because it's pytorch
00:34:43.760 | that will decide which node will initialize the weights and so every node should have access to
00:34:50.080 | this shared storage plus it is good rule in distributed system to not have one node that
00:34:54.800 | is more important than the others because any node can fail at any time so we should not make
00:35:00.560 | any assumption on which is the node that will initialize the weights its pytorch will choose it
00:35:05.120 | and actually usually pytorch chooses the rank number zero but we should not make assumption
00:35:11.520 | on which node will be the rank number zero so all nodes should be treated equally so they should run
00:35:17.120 | the same code and they should have access to the same shared storage and who however when we have
00:35:24.000 | a shared storage who should be responsible for saving the checkpoint because if we make a code
00:35:28.160 | in which everyone is writing the checkpoint they may end up overwriting each other so what we do
00:35:34.160 | is because pytorch will give us will tell us which what is the rank of the current node we will write
00:35:40.640 | the code in such a way that we check the rank of the current node and if it's the rank number zero
00:35:44.640 | then we save the checkpoint if it's a rank number one two or three we don't do anything so it means
00:35:50.080 | that only one rank will be responsible for saving the checkpoint but later when we restart the
00:35:55.600 | training we don't make any assumption on who will become the rank number zero and as the pytorch
00:36:01.600 | documentation says it says that the the rank is not stable means that if you restart the training
00:36:09.760 | it the rank number zero may be assigned to another node okay now that we have seen how
00:36:16.960 | distributed training works at the theoretical level so we accumulate the gradient that is
00:36:23.440 | calculated locally by multiple nodes and then send it to one single node which then calculates
00:36:28.720 | the sum and send it back to all the others which then update the parameter using this sum of the
00:36:34.320 | gradients it's time to actually look at the practical training so we will first build the
00:36:41.040 | infrastructure that is needed to run our training and then i will also show you what are the code
00:36:48.160 | changes that we need to make to our existing training loop to make it distributed and run it
00:36:53.120 | on the infrastructure that we are going to create i will use paper space as a cloud service mostly
00:36:59.840 | because it's easy to use and doesn't require much setup time even for beginners i chose it over aws
00:37:06.240 | because aws has a lot of other setup that you need to do in order to even do simple operations
00:37:11.840 | and it's easy to get lost so i'm using paper space mostly for this reason so that anyone
00:37:16.480 | without with any kind of level of knowledge can do the same can follow the tutorial very easily
00:37:22.400 | so let's get started okay the first thing we need to do is to go to my repository called
00:37:29.600 | pytorch transformer distributed in which you will find the code of the distributed model that we
00:37:35.040 | will be running on this cluster and also the instruction on how to create this cluster on
00:37:39.840 | paper space so i have already accessed my account on paper space the first thing you will want to
00:37:45.600 | do is to create your ssh key just like you do on github so that you can use this your public ssh
00:37:52.320 | key to you need to associate your public ssh key to your account here so that you can connect to
00:37:59.360 | the machines that are created on paper space okay the first thing we need to do is to create a
00:38:05.520 | private network on which we will connect all these two machines so we will create two machines that
00:38:10.000 | are connected in this private network and also a shared disk that can be accessed by both machines
00:38:15.840 | we will use a machine that has two gpus so in total we will have four gpus on which we will
00:38:20.560 | run the training so let's create our private network networks here and we create a new network
00:38:28.160 | remember to choose the same region for your computers and the cluster and we will call it
00:38:32.640 | distributed training
00:38:37.760 | okay now we have our subnet you can see here the next step to do is to create two nodes
00:38:47.440 | of type of we can choose any node actually i i choose this one because i wanted to test two
00:38:52.800 | machines each one having two gpus and we use ml in a box as the operating system so as the image
00:38:58.400 | of these machines so we create a new machine ml in a box the machine we will do is multi gpu p4000
00:39:06.800 | multiplied by two this one the region is the same as the network so new york 2 the disk size 50 gb
00:39:13.600 | is enough in total i think last time i created this cluster and i ran it for many epochs i spent
00:39:20.400 | five dollars so i think you should not spend more than five dollars for running your own
00:39:25.040 | distributed training it should not be more expensive than five dollars the first machine
00:39:31.440 | we will call it cuda zero the network we need to select the network that we have created before
00:39:37.680 | and we choose public ip dynamic because otherwise we cannot connect to the machine if without a
00:39:42.080 | public ip then we never create a snapshot because we don't care about backup for now
00:39:47.120 | and the price of running each machine should be around one dollar you can see here
00:39:50.560 | so we created the first machine
00:39:54.000 | and then we create also the second one so ml in a box the machine is this one
00:40:05.280 | new york disk size this one we will call it cuda one
00:40:10.720 | distributed training dynamic we don't save and create machine
00:40:18.880 | last but not least we need to create a network drive of 250 gigabytes we can create this is the
00:40:25.840 | smallest one they have available so that's why i chose 250 gigabytes we will call it model training
00:40:32.720 | 250 new york and distributed train this must belong to the same network as the two machines
00:40:38.320 | okay create
00:40:42.240 | okay so they are provisioning and starting the two machines now we will configure the machines
00:40:51.760 | so we need to install some packages first of all we need to install ifconfig because we need to
00:40:56.080 | have the ip address and while installing ifconfig i ran into a error with a seahorse and i show also
00:41:03.120 | how to solve this error we will then mount the network drive and then we will clone this
00:41:08.160 | repository install all the packages that we need for running this code and then we initial i also
00:41:15.520 | recommend using weights and biases for uh keeping tracking keeping track of the loss etc of all the
00:41:21.680 | metrics that you need during the training of this model so i recommend you register on weights and
00:41:27.840 | biases and then you install it and use it also for this for this code for running this code
00:41:34.320 | because it will make you make it easy for you to visualize the training
00:41:37.360 | metrics okay the cuda zero is ready we can see some information here
00:41:47.760 | and connect this will give you us the ip address
00:41:53.280 | we connect to it yes
00:41:58.480 | wonderful so now we are inside the machine the first thing we do is we update all the packages
00:42:12.320 | okay we do also install the net tools but i remember it will run into an into an error
00:42:19.920 | but looks like this time it doesn't which is good let's try ifconfig wonderful so now we have
00:42:28.320 | the ip address of the first machine which is this one this is the private address that belongs to
00:42:32.800 | your subnet that you created here so the one you created here 810 okay now we can do the we we need
00:42:45.440 | to keep track of this ip address because we later we need to modify the host and this is actually
00:42:49.920 | needed because i ran into an error with the pytorch when running distributed training because
00:42:54.720 | it could not um it could not reach the other node so i had to modify the host by mapping the host
00:43:00.640 | name of the other nodes into its ip uh okay i let's mount the network drive following just
00:43:08.800 | the instructions i have written so we install this package let me call this one cuda zero
00:43:26.960 | then we created the directory in which we want to mount this network drive
00:43:31.280 | and then we have to run this command here you can see it here but we need to replace the ip address
00:43:39.680 | and the username and password of the drive so let's first paste it then we need to
00:43:46.640 | replace this one with the ip address and the network share name which i'll show you how to
00:43:52.240 | find we go here drives and we can see here the address so we do but we need to replace
00:44:05.440 | the escape character with the slash slash okay and the network drive username is this one
00:44:19.040 | let's run this command the password is also here we can copy it
00:44:31.840 | et voila now it should be mounted the first thing we do is we clone our repository so we are in the
00:44:38.240 | home directory of the default user so paper space
00:44:43.280 | we can clone it directly here no problem we then cd
00:44:48.560 | and then we install the requirements
00:44:57.840 | okay now we have installed the requirements so now we can log in into weights and biases
00:45:03.280 | using this command here but remember to copy the key from the website of weights and biases
00:45:09.840 | so which can be found here let's run
00:45:14.560 | and it should be okay okay now we are ready to run the training command on the first computer
00:45:26.560 | but we need to also prepare the second one so let me prepare the second one
00:45:30.000 | of course in a real scenario you would create a docker file and you would run everything using
00:45:39.360 | kubernetes so it should be done automatically but in this case because most of you maybe are not
00:45:45.200 | familiar with kubernetes or docker or you just want to run very fast your training to see how it works
00:45:51.280 | so i recommend using paper space because it's very easy to configure all the setup
00:46:10.480 | okay just like before
00:46:39.200 | okay now we also clone here
00:46:41.120 | so we clone the same repository on both computers and then we run the
00:46:48.080 | same code for training but we will see later how to do it
00:47:01.520 | okay now we do login with weights and biases
00:47:07.200 | and now we are ready to run the training on both computers
00:47:17.760 | okay the command that you need to run to in order to run the training on both machine is the same
00:47:24.720 | but we first need to take the ip address of the computer that we will choose so one of the two
00:47:30.160 | computer will become the rendezvous master it means that all the communication will be managed
00:47:34.800 | by that node and all the others will adapt now of course to make it more fail safe we need to create
00:47:43.360 | we can use for example a dynamic ip that can be mapped to another machine in case the master
00:47:49.120 | crashes so that we can restart the training using always the same ip but in this simple case i will
00:47:55.440 | configure it by hand otherwise i need an orchestration tool and that will complicate
00:48:00.000 | all the scenario so in this case i just want to show you how to do distributed training i don't
00:48:04.400 | want to spend too much time creating the perfect infrastructure which you would ideally do in a
00:48:09.680 | real scenario so we take this command here and as you can see in this command we need to tell
00:48:16.960 | the ip address of the master node here so how which one we will choose the master in my case
00:48:22.400 | i will choose cuda0 so the first machine i have created and the other one will be kind of the
00:48:26.400 | slave even if they both perform the same operation so we find the ip of this slave here which is this
00:48:35.120 | one and we put it in the host file of the master and also i need the host name of the slave which
00:48:42.000 | is this one perfect so okay okay so we need to paste here the ip address of this node
00:48:54.880 | and also its host name which is this one
00:48:59.200 | and that's pretty much it now we can start the training so
00:49:06.400 | we take the ip address of the master now so cuda0 is our master which is this one
00:49:12.640 | and we put it in the command in this position here and we cd to the torch and we can run the
00:49:22.240 | command so in this command what we are saying is torch run is a special command that will create
00:49:27.280 | the cluster and will manage all the cluster creation and communication it will run our
00:49:32.000 | training loop you can see here first of all we need to tell how many process we have in this
00:49:38.400 | cluster so we have two computers and how many process we want to create for each node so how
00:49:43.600 | many compute how many gpus we have for each computer so this is the number of gpus per
00:49:48.320 | computer and this is the number of computers so we have two and two this is a unique id that
00:49:53.360 | indicates this particular cluster it should be unique for each cluster and this is the back-end
00:49:59.280 | library that is managing the communication for us all the parameters after the file name that
00:50:04.000 | we will use for training are the arguments that are passed to this file here so in our case we
00:50:09.200 | are running with a batch size of 8 we are also telling him that the model folder so where the
00:50:14.000 | checkpoint should be saved are this is the shared folder that we created before so the mount file
00:50:20.160 | of the shared network drive we run the same command on both computers and this will start
00:50:25.280 | the training so we do it this one and also on the other one as you can see this computer here
00:50:35.520 | is not proceeding it's waiting for the other so now we run also it here and now it should proceed
00:50:47.840 | yeah so now both are proceeding oops i forgot to set the host file on this computer here
00:50:57.840 | so i retrieved the ip of this one so ip ip of this one and also the host name of this one
00:51:05.680 | and we put in the host file of the other computer so
00:51:16.480 | okay let's run again the training
00:51:46.320 | looks like it's proceeding so they are both doing yeah they are both building the data set now the
00:52:00.720 | tokenizer so if you have watched my previous video on how to code a transformer model from zero this
00:52:05.760 | is exactly the same code except that we i have added some things to manage the distributed training
00:52:11.440 | but it's very minimal code change and later i will show you step by step how to how i done so as you
00:52:16.160 | can see now the training is running in parallel the first thing you will notice is that the weights
00:52:20.720 | and biases is only initialized on one node because we cannot send the metrics from multiple nodes
00:52:26.160 | because it will interfere with each other so we send the metrics only from one node and it's the
00:52:31.360 | node with rank number zero we will see later how we check this information as you can see they are
00:52:36.640 | both training on a subset of the data so this one is training on 910 batch and this one also 910
00:52:42.000 | batch it means that in total we have 1820 batches in total and each one is calculating a local
00:52:50.880 | gradient and sending it to the other who will calculate the sum of this gradient actually
00:52:56.400 | we have four nodes here because we have four gpu so each gpu is calculating a local gradient
00:53:00.880 | and each each gradient is sent to the rank number zero which will calculate the sum of all these
00:53:11.360 | gradients using the reduce operation actually the old reduce operation because then it will send
00:53:16.640 | back the sum to all the other nodes who will then update the parameters using the sum of these
00:53:22.320 | gradients that it has received another thing i made a mistake is that this 910 is not multiplied
00:53:28.240 | by 2 because it's not the the rank 2 is actually later we will see what is the difference between
00:53:33.280 | local rank and the global rank but we have four nodes each node is working on 910 batches of data
00:53:40.640 | so i only show one because tqdm will otherwise if i show the tqdm for both gpus it will interfere
00:53:48.880 | with each other this progress bar basically here so i only show one progress bar per computer
00:53:53.760 | not two because each computer has two gpus so actually i should have two progress bar but
00:53:58.720 | i only show one otherwise the visualization is very bad so first of all let's navigate
00:54:04.000 | the code to understand how it works let me open the project
00:54:13.840 | okay let's see here let's start from the train file
00:54:29.840 | okay the main difference that we have compared to the original code that i built in my previous video
00:54:36.560 | but it's the same this code with these changes that i'm showing here will apply to any training
00:54:41.440 | loop that you have built so it's not only for this one this particular code this will apply
00:54:45.200 | to anyone okay the first thing we do is we read the these two variables that are so when we run
00:54:52.800 | the code with the torch run torch run will insert some environment variables into our environment
00:54:58.560 | that we can read one is called rank and one is called local rank let's see the difference
00:55:03.200 | okay the local rank basically indicates the number of the gpu in the local computer while
00:55:12.560 | the global rank or also called just rank indicates the number unique id of the gpu among all the
00:55:20.000 | cluster so if we have four gpus the rank will be unique among all the cluster while the local rank
00:55:26.960 | is not unique but it's unique to the local computer the local rank is useful for example
00:55:32.240 | when we want to print for example only on one gpu per each computer while the global rank is useful
00:55:38.800 | when we want to only one gpu to perform an operation among all the others for example if
00:55:45.360 | we want to initialize weights and biases or any other service that should be initialized only
00:55:49.600 | from one node in all the cluster then we use the global rank which is this environment variable
00:55:55.360 | here while if we want to use something for example for printing or only one local gpu should use the
00:56:02.160 | tqdm or the progress bar or any other stuff that is can interfere with each other on the local
00:56:07.760 | computer then we use the local rank so the first thing i do is i load these two environment
00:56:13.200 | variables and save it into the configuration the second thing i do okay i print the configuration
00:56:18.560 | this first thing we need to do to initialize the cluster and this is where the torch run will stop
00:56:25.200 | waiting for all the nodes to connect is to call this function here init process group
00:56:29.360 | this init process group belongs to a package that i imported here called torch.distributed
00:56:36.960 | so these are the imports that we need to make in order to use distributed training so torch.utils.data
00:56:43.440 | distributed we need this distributed sampler this distributed data parallel this init process
00:56:48.640 | group and destroy process group so this is the first thing we do so we read the environment
00:56:53.360 | variables we save it somewhere so we can access the local and the global rank we call this function
00:56:58.640 | here indicating the backend that we want to use i am using cuda so i want to use nickel so which is
00:57:04.720 | nccl we can also use the local rank to tell cuda which is the gpu we will be doing training upon
00:57:11.360 | so each computer will have the will be given each gpu will be given its local rank with relative to
00:57:17.360 | the current computer so the first gpu will be assigned local rank zero and the second gpu will
00:57:23.120 | be assigned the local rank one on each computer so we set here then we run the training loop and
00:57:29.360 | after the training loop has finished we do destroy process group let's see the detail of this training
00:57:34.080 | group so when we do the training loop first of all here i disable the training on the cpu because
00:57:40.240 | because we are using a distributed data parallel so it only works with the backend the nickel it
00:57:44.560 | only works with cuda so i'm disabling on the cpu the second thing we do is when we create the
00:57:51.920 | data loader so here so the train data loader and the validation data data loader we need to disable
00:57:58.960 | the shuffling on the data loader and introduce this parameter sampler sampler and we need to
00:58:04.640 | pass an instance of the distributed sampler here in which we tell him what is the data set and we
00:58:10.560 | want to shuffle it so we shuffle it using distributed sampler not the data loader here
00:58:17.120 | and the next thing we do is we create the code the logic for pre-loading the checkpoint so we
00:58:23.920 | check if we want to preload the checkpoint in my case the default configuration says that
00:58:28.160 | the we should always load the latest checkpoint you can see it here so the configuration the
00:58:32.960 | default configuration says that we should load the pre-trained the the latest checkpoint available
00:58:38.560 | so the latest checkpoint file is retrieved using the path that we passed using the command so
00:58:45.200 | remember here we have a path that indicates what is the directory where we save the checkpoints and
00:58:52.240 | we use this one if there is a latest checkpoint this will be loaded in our model so our model
00:58:59.440 | is created here so we create an instance of our model which is just basically where you get the
00:59:06.880 | instance of your model and then you can preload some the state dict here and also the optimizer
00:59:14.000 | state dict and all the other global variables that should be pre-loaded for for resume the training
00:59:20.640 | if the global config rank is zero we also initialize the services like weights and bias
00:59:28.240 | you know in the case of weights and bias we also may want to resume the training from the same run
00:59:32.640 | in which we last crashed so every time we save the checkpoint we save also the ran the run id of the
00:59:38.080 | weights and bias run and we can restore it if we restore the checkpoint the big part that introduces
00:59:47.920 | the distributed parallel training is this so now we have our model which could be anything in the
00:59:52.960 | case of my code is the transformer model so it's a special class that i created in my previous video
00:59:58.560 | this one here it's just nn.model this one here we need to wrap it in an instance of distributed
01:00:06.000 | data model you can see here and we also indicate what is the device id we will be using so the gpu
01:00:11.520 | id which is the local rank we also okay with the global rank zero we can also initialize other
01:00:18.480 | stuff this in this case is just weights and bias the training loop is the same as the non-parallel
01:00:24.800 | code except that instead of running directly the model so if we before do model.forward
01:00:33.680 | we need to do model.module.forward so if we need to access the encode method of our model which is
01:00:40.880 | for example this method here we cannot access model.encode we need to access model.module.encode
01:00:48.080 | because this model refers to an instance of distributed data parallel if we want to retrieve
01:00:55.760 | the original model we need to do model.module and every time we run loss.backward on this module
01:01:03.520 | PyTorch will intercept this and will calculate the gradient the local gradient and after it has
01:01:10.480 | calculated the local gradient it will send it to all the other nodes for accumulation and then
01:01:14.960 | receive back the accumulated gradient and then this up this cumulative gradient is used when
01:01:21.120 | we use optimizer.step and then optimizer.0 will set it to reset it to zero when we want to save
01:01:30.400 | the checkpoint which for example we can do every epoch in my case i do it every epoch i save it
01:01:37.920 | only on the global rank number zero because i don't want to save the checkpoint on all the nodes
01:01:43.440 | otherwise they will be overwriting each other's checkpoint so i only do it on the global rank zero
01:01:48.640 | as you can see it here and this is pretty much what we need to do i let me show you a template
01:01:56.080 | that you can follow for your future projects on how to integrate the distributed data parallel
01:02:00.640 | in your existing code so let's see okay this is a template that you should follow for your future
01:02:08.560 | vid for future projects the first thing you do is you read these two environment variables called
01:02:13.680 | the local rank and rank the one the first one indicates the local rank so the number we saw
01:02:18.720 | before so the number of the gpu relative to the local computer not to the entire cluster the
01:02:24.000 | second one indicates the global rank which is a unique identifier for this particular gpu among
01:02:29.120 | all the gpus of the cluster we then call the function init process group we indicated the
01:02:33.920 | backend we want to use for in case of cuda which i think is most of the cases is nickel because
01:02:38.960 | it's the best one and it's made by nvidia so the second thing we do is we set which device
01:02:44.720 | cuda should use which is the local rank then we run the training loop the training loop only on
01:02:50.640 | the global rank number zero we initialize the services like weights and biases we create our
01:02:57.440 | data loader by telling him that as a sampler it should use the distributed sampler here and using
01:03:02.960 | shuffle but not shuffle here on the data loader we create an instance of our model which is our
01:03:08.720 | custom class indicating our model and if there is a latest checkpoint we can load it now after we
01:03:15.280 | have loaded the latest checkpoint we need to wrap our model into an instance of distributed data
01:03:20.560 | parallel by also indicating the device id it should use which is the local rank all the rest
01:03:26.320 | is the same except that on the global rank number zero we can collect some statistics so for example
01:03:31.760 | we can send some statistics to weights and biases and only on the global rank number zero we save
01:03:38.080 | the state of the model every epoch for example so that if the training crashes we can resume from
01:03:44.960 | the latest checkpoint let's try to simulate a crash so let's go here let's see how is the training
01:03:53.200 | going so as you can see now we are already training on the epoch number one so one epoch is already
01:03:58.640 | done and our code should be in such a way that as you can see the code i built here we are running
01:04:05.440 | validation only on the global rank number zero we actually you don't have to run validation during
01:04:13.120 | this distributed training you can do it asynchronously creating another for example node
01:04:18.400 | that reads the latest checkpoint and runs the validation asynchronously so that our gpu can
01:04:24.960 | work directly always on the training and not wasting time for validation because otherwise
01:04:31.840 | all the nodes are waiting for the node number zero to run validation and this can be expensive
01:04:37.760 | for bigger models so we can do it asynchronously okay now we can see that the node number zero
01:04:46.560 | the all the nodes are training the epoch number one as you can see here so i will try to simulate
01:04:51.920 | a crash and and see that the the distributed training will resume from the checkpoint of the
01:04:58.960 | epoch number zero that we have already completed so let's make it crash i will crash the master
01:05:06.560 | node directly so we can try the worst case scenario this one has crashed so i just killed
01:05:13.280 | the node and we see that this one should also be killed after a while
01:05:17.360 | okay this one is also killed we can restart using the same command as before
01:05:35.120 | as you can see they are loading the dataset
01:05:38.800 | loading the tokenizer and preloading the model as you can see here the model that was previously
01:05:47.600 | saved by the global rank number zero so now they should resume the
01:05:51.440 | training from the first epoch not from the zeroth epoch
01:06:00.400 | and yes they are starting from the epoch number zero one so because the epoch number zero zero
01:06:05.200 | has already been done and this is how failover works so you basically save a checkpoint every
01:06:11.600 | once in a while then you can restart the training from that checkpoint when you save a checkpoint
01:06:16.880 | you should also save all the variables that are relevant to resuming the training so if you have
01:06:22.000 | some state variables like for example a global step counter or you have some other counters that
01:06:28.000 | you need to keep track of you can save them in the checkpoint and then restore them when you
01:06:31.680 | load the checkpoint okay let's stop the training here you can try to experiment with yourself
01:06:41.440 | with the paper space i think it's quite easy and let me know if you have any problem i will try to
01:06:45.520 | help you okay now let's look at how distributed parallel training so distributed data parallel
01:06:51.920 | was integrated into pytorch so because there are some clever design choices into this to make it
01:06:57.680 | very fast let's review them so first of all when does pytorch synchronize the gradients well this
01:07:04.160 | happens when we call loss.backward because this when we call loss.backward this will lead to each
01:07:10.240 | node calculating its local gradient which is the derivative of the loss function with respect to
01:07:15.280 | each node in the computation graph and each node will send its local gradient to a central node
01:07:21.200 | which is the global rank number zero and receives back the cumulative gradient through an operation
01:07:26.240 | that is all reduce each node will then update its weights so the parameters of the model using the
01:07:32.560 | cumulative gradient and of course the learning rate of the local optimizer we can avoid pytorch
01:07:39.120 | synchronizing the gradient at every backward step and instead let it accumulate it for a few steps
01:07:44.800 | for example for a few batches using the no sync context let's see how to integrate it so usually
01:07:51.760 | you run lost the forward and forward step then you run the backward step this will lead to
01:07:58.800 | synchronization then you do optimizer.step and then optimizer.zerograd however if we want to
01:08:06.240 | do it every few epochs we don't want to run the synchronization at every step we can use the no
01:08:12.400 | sync context you can see here which is a method provided by distributed data parallel which is
01:08:18.080 | the wrapper of our model and basically we create this context manager this we run the forward step
01:08:24.640 | and the backward step and we don't call optimizer.step when we are outside of this context
01:08:31.040 | we still need to do the forward and the backward step then we can run optimizer.step and zerograd
01:08:35.920 | basically this no sync will disable the synchronization of the gradient but it will
01:08:40.880 | let it accumulate locally for a while and after a while we can we can synchronize also there is
01:08:49.360 | another very clever trick in pytorch which is the computation communication overlap so since
01:08:55.200 | each gpu needs to send its gradient to a central node for accumulation this can lead to an idle
01:09:01.680 | time in which the cpu gpus are not working because they're communicating right so each gpu will
01:09:06.400 | perform the forward step then it will perform the backward step and then there is some communication
01:09:11.680 | overhead because then gpus need to synchronize the gradients with each other calculate the
01:09:20.000 | cumulative on one node and receive back the cumulative one if we do this operation sequentially
01:09:26.080 | it will lead to a big delay in the gpu because this time they could do some other meaningful work
01:09:31.520 | for the training while so what pytorch does it will overlap the communication
01:09:38.720 | while running the backward step let's see how it works so as you remember we have a computation
01:09:46.800 | graph right so pytorch will communicate the gradient of a node as soon as it is available
01:09:51.920 | because as you remember we calculate the gradient of the loss function with respect to each weight
01:09:56.960 | but to calculate the gradient of the loss function with respect to each weight we need to calculate
01:10:00.800 | the gradient of the loss function with respect to the intermediate nodes so for example first we
01:10:05.440 | compute the gradient of the loss function with respect to the output then we compute it of the
01:10:11.360 | loss function with respect to the layer the weights of this layer here which are the weights and the
01:10:16.480 | bias but these are already available so we can already send them to the other nodes and get back
01:10:22.640 | the cumulative one so this can already be done as already this operation then pytorch will calculate
01:10:28.880 | the next layer and this one since it's already available we can already send it to the other
01:10:33.760 | nodes and receive back the cumulative one etc etc etc so while pytorch is computing the backward
01:10:39.840 | step we can already communicate to the other nodes the gradient of the that we have already computed
01:10:45.360 | and receive back the cumulative one this will result in a very fast speed up and to make the
01:10:52.800 | communication even faster instead of sending one gradient at a time pytorch basically will create
01:10:58.560 | buckets of gradients so every time a bucket is available it sends it to the other node and
01:11:03.200 | receives back the cumulative gradient for that bucket then after the second bucket is available
01:11:08.400 | it will send the second bucket and receive back the cumulative if we after the last bucket is
01:11:13.120 | available it will send it to the other nodes and receive back this way we can as you saw before
01:11:20.800 | we can overlap the communication overhead with the calculation of the backward step so while we are
01:11:27.920 | calculating the gradients we already send them so that the total time to process the forward the
01:11:34.560 | backward the communication and the updating it's less because one two steps are overlapped with
01:11:39.920 | each other pytorch recommends 25 megabyte as the size of the bucket because we don't want it to be
01:11:46.800 | too small because the overhead of the communication of each bucket would be too big so we have a lot
01:11:52.000 | of overhead for each bucket so we can spread this overhead on bigger buckets so less in number but
01:11:59.360 | bigger but we also don't want it to be too big because otherwise the we are we are not using the
01:12:07.440 | communication channel while we are computing the gradient so thank you guys for watching my video
01:12:13.600 | i hope you learned a lot in this video i introduced the data parallel training and i show you how to
01:12:18.960 | create the first the infrastructure and also how to run this code of distributed training on the
01:12:24.320 | infrastructure that we have created i also show you how pytorch works under the hood so that you
01:12:30.560 | can understand how the gradients is synchronized and also at a mathematical level how does it work
01:12:36.400 | please like and subscribe my video if you liked it and it was hopeful helpful for you i also
01:12:42.240 | recommend watching my other videos because i every time i make long videos in which i give a lot of
01:12:47.200 | knowledge and let me know if there is something you don't understand i will try to help all of you