相關博客
【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(一):并行環(huán)境初始化
【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(二):Collective通信操作的封裝mappings
【深度學習】【分布式訓練】DeepSpeed:AllReduce與ZeRO-DP
【深度學習】混合精度訓練與顯存分析
【深度學習】【分布式訓練】Collective通信操作及Pytorch示例
【自然語言處理】【大模型】大語言模型BLOOM推理工具測試
【自然語言處理】【大模型】GLM-130B:一個開源雙語預訓練語言模型
【自然語言處理】【大模型】用于大型Transformer的8-bit矩陣乘法介紹
【自然語言處理】【大模型】BLOOM:一個176B參數(shù)且可開放獲取的多語言模型
? 大模型時代,單機已經(jīng)無法完成先進模型的訓練和推理,分布式訓練和推理將會是必然的選擇。各類分布式訓練和推斷工具都會使用到Collective通信。網(wǎng)絡上大多數(shù)的教程僅簡單介紹這些操作的原理,沒有代碼示例來輔助理解。本文會介紹各類Collective通信操作,并展示pytorch中如何使用。
一、Collective通信操作
1. AllReduce
? 將各個顯卡的張量進行聚合(sum、min、max)后,再將結(jié)果寫回至各個顯卡。
2. Broadcast
? 將張量從某張卡廣播至所有卡。
3. Reduce
? 執(zhí)行同AllReduce相同的操作,但結(jié)果僅寫入具有的某個顯卡。
4. AllGather
? 每個顯卡上有一個大小為N的張量,共有k個顯卡。經(jīng)過AllGather后將所有顯卡上的張量合并為一個 N × k N\times k N×k的張量,然后將結(jié)果分配至所有顯卡上。
5. ReduceScatter
? 執(zhí)行Reduce相同的操作,但是結(jié)果會被分散至不同的顯卡。
二、Pytorch示例
? pytorch的分布式包torch.distributed
能夠方便的實現(xiàn)跨進程和跨機器集群的并行計算。本文代碼運行在單機雙卡服務器上,并基于下面的模板來執(zhí)行不同的分布式操作。
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def init_process(rank, size, fn, backend='nccl'):
"""
為每個進程初始化分布式環(huán)境,保證相互之間可以通信,并調(diào)用函數(shù)fn。
"""
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
def run(world_size, func):
"""
啟動world_size個進程,并執(zhí)行函數(shù)func。
"""
processes = []
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, func))
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
run(2, func) # 這里的func隨后會被替換為不同的分布式示例函數(shù)
pass
? 先對上面的模板做一些簡單的介紹。
- 函數(shù)
run
會根據(jù)傳入的參數(shù)world_size,生成對應數(shù)量的進程。每個進程都會調(diào)用init_process
來初始化分布式環(huán)境,并調(diào)用傳入的分布式示例函數(shù)。 -
torch.distributed.init_process_group()
,該方法負責各進程之間的初始協(xié)調(diào),保證各進程都會與master進行握手。該方法在調(diào)用完成之前會一直阻塞,并且后續(xù)的所有操作都必須在該操作之后。調(diào)用該方法時需要初始化下面的4個環(huán)境變量:- MASTER_PORT:rank 0進程所在機器上的空閑端口;
- MASTER_ADDR:rank 0進程所在機器上的IP地址;
- WORLD_SIZE:進程總數(shù);
- RANK:每個進程的RANK,所以每個進程知道其是否是master;
1. 點對點通信
? 在介紹其他collective通信之前,先看一個簡單的點對點通信實現(xiàn)。
def p2p_block_func(rank, size):
"""
將rank src上的tensor發(fā)送至rank dst(阻塞)。
"""
src = 0
dst = 1
group = dist.new_group(list(range(size)))
# 對于rank src,該tensor用于發(fā)送
# 對于rank dst,該tensor用于接收
tensor = torch.zeros(1).to(torch.device("cuda", rank))
if rank == src:
tensor += 1
# 發(fā)送tensor([1.])
# group指定了該操作所見進程的范圍,默認情況下是整個world
dist.send(tensor=tensor, dst=1, group=group)
elif rank == dst:
# rank dst的tensor初始化為tensor([0.]),但接收后為tensor([1.])
dist.recv(tensor=tensor, src=0, group=group)
print('Rank ', rank, ' has data ', tensor)
if __name__ == "__main__":
run(2, p2p_block_func)
? p2p_block_func
實現(xiàn)從rank 0發(fā)送一個tensor([1.0])至rank 1,該操作在發(fā)送完成/接收完成之前都會阻塞。
? 下面是一個不阻塞的版本:
def p2p_unblock_func(rank, size):
"""
將rank src上的tensor發(fā)送至rank dst(非阻塞)。
"""
src = 0
dst = 1
group = dist.new_group(list(range(size)))
tensor = torch.zeros(1).to(torch.device("cuda", rank))
if rank == src:
tensor += 1
# 非阻塞發(fā)送
req = dist.isend(tensor=tensor, dst=dst, group=group)
print("Rank 0 started sending")
elif rank == dst:
# 非阻塞接收
req = dist.irecv(tensor=tensor, src=src, group=group)
print("Rank 1 started receiving")
req.wait()
print('Rank ', rank, ' has data ', tensor)
if __name__ == "__main__":
run(2, p2p_unblock_func)
? p2p_unblock_func
是非阻塞版本的點對點通信。使用非阻塞方法時,因為不知道數(shù)據(jù)何時送達,所以在req.wait()
完成之前不要對發(fā)送/接收的tensor進行任何操作。
2. Broadcast
def broadcast_func(rank, size):
src = 0
group = dist.new_group(list(range(size)))
if rank == src:
# 對于rank src,初始化tensor([1.])
tensor = torch.zeros(1).to(torch.device("cuda", rank)) + 1
else:
# 對于非rank src,初始化tensor([0.])
tensor = torch.zeros(1).to(torch.device("cuda", rank))
# 對于rank src,broadcast是發(fā)送;否則,則是接收
dist.broadcast(tensor=tensor, src=0, group=group)
print('Rank ', rank, ' has data ', tensor)
if __name__ == "__main__":
run(2, broadcast_func)
? broadcast_func
會將rank 0上的tensor([1.])廣播至所有的rank上。
3. Reduce與Allreduce
def reduce_func(rank, size):
dst = 1
group = dist.new_group(list(range(size)))
tensor = torch.ones(1).to(torch.device("cuda", rank))
# 對于所有rank都會發(fā)送, 但僅有dst會接收求和的結(jié)果
dist.reduce(tensor, dst=dst, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor)
if __name__ == "__main__":
run(2, reduce_func)
? reduce_func
會對group中所有rank的tensor進行聚合,并將結(jié)果發(fā)送至rank dst。
def allreduce_func(rank, size):
group = dist.new_group(list(range(size)))
tensor = torch.ones(1).to(torch.device("cuda", rank))
# tensor即用來發(fā)送,也用來接收
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor)
if __name__ == "__main__":
run(2, allreduce_func)
? allreduce_func
將group中所有rank的tensor進行聚合,并將結(jié)果發(fā)送至group中的所有rank。
4. Gather與Allgather
def gather_func(rank, size):
dst = 1
group = dist.new_group(list(range(size)))
# 該tensor用于發(fā)送
tensor = torch.zeros(1).to(torch.device("cuda", rank)) + rank
gather_list = []
if rank == dst:
# gather_list中的tensor數(shù)量應該是size個,用于接收其他rank發(fā)送來的tensor
gather_list = [torch.zeros(1).to(torch.device("cuda", dst)) for _ in range(size)]
# 僅在rank dst上需要指定gather_list
dist.gather(tensor, gather_list=gather_list, dst=dst, group=group)
else:
# 非rank dst,相當于發(fā)送tensor
dist.gather(tensor, dst=dst, group=group)
print('Rank ', rank, ' has data ', gather_list)
if __name__ == "__main__":
run(2, gather_func)
? gather_func
從group中所有rank上收集tensor,并發(fā)送至rank dst。(相當于不進行聚合操作的reduce)
def allgather_func(rank, size):
group = dist.new_group(list(range(size)))
# 該tensor用于發(fā)送
tensor = torch.zeros(1).to(torch.device("cuda", rank)) + rank
# gether_list用于接收各個rank發(fā)送來的tensor
gather_list = [torch.zeros(1).to(torch.device("cuda", rank)) for _ in range(size)]
dist.all_gather(gather_list, tensor, group=group)
# 各個rank的gather_list均一致
print('Rank ', rank, ' has data ', gather_list)
if __name__ == "__main__":
run(2, allgather_func)
? allgather_func
從group中所有rank上收集tensor,并將收集到的tensor發(fā)送至所有group中的rank。
5. Scatter與ReduceScatter
def scatter_func(rank, size):
src = 0
group = dist.new_group(list(range(size)))
# 各個rank用于接收的tensor
tensor = torch.empty(1).to(torch.device("cuda", rank))
if rank == src:
# 在rank src上,將tensor_list中的tensor分發(fā)至不同的rank上
# tensor_list:[tensor([1.]), tensor([2.])]
tensor_list = [torch.tensor([i + 1], dtype=torch.float32).to(torch.device("cuda", rank)) for i in range(size)]
# 將tensor_list發(fā)送至各個rank
# 接收屬于rank src的那部分tensor
dist.scatter(tensor, scatter_list=tensor_list, src=0, group=group)
else:
# 接收屬于對應rank的tensor
dist.scatter(tensor, scatter_list=[], src=0, group=group)
# 每個rank都擁有tensor_list中的一部分tensor
print('Rank ', rank, ' has data ', tensor)
if __name__ == "__main__":
run(2, scatter_func)
? scatter_func
會將rank src中的一組tensor逐個分發(fā)至其他rank上,每個rank持有的tensor不同。
def reduce_scatter_func(rank, size):
group = dist.new_group(list(range(size)))
# 用于接收的tensor
tensor = torch.empty(1).to(torch.device("cuda", rank))
# 用于發(fā)送的tensor列表
# 對于每個rank,有tensor_list=[tensor([0.]), tensor([1.])]
tensor_list = [torch.Tensor([i]).to(torch.device("cuda", rank)) for i in range(size)]
# step1. 經(jīng)過reduce的操作會得到tensor列表[tensor([0.]), tensor([2.])]
# step2. tensor列表[tensor([0.]), tensor([2.])]分發(fā)至各個rank
# rank 0得到tensor([0.]),rank 1得到tensor([2.])
dist.reduce_scatter(tensor, tensor_list, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor)
if __name__ == "__main__":
run(2, reduce_scatter_func)
參考資料
https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/collectives.html
https://pytorch.org/tutorials/intermediate/dist_tuto.html#collective-communication文章來源:http://www.zghlxwxcb.cn/news/detail-412125.html
https://pytorch.org/docs/stable/distributed.html#collective-functions文章來源地址http://www.zghlxwxcb.cn/news/detail-412125.html
到了這里,關于【深度學習】【分布式訓練】Collective通信操作及Pytorch示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!