国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【深度學習】【分布式訓練】Collective通信操作及Pytorch示例

這篇具有很好參考價值的文章主要介紹了【深度學習】【分布式訓練】Collective通信操作及Pytorch示例。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

相關博客
【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(一):并行環(huán)境初始化
【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(二):Collective通信操作的封裝mappings
【深度學習】【分布式訓練】DeepSpeed:AllReduce與ZeRO-DP
【深度學習】混合精度訓練與顯存分析
【深度學習】【分布式訓練】Collective通信操作及Pytorch示例
【自然語言處理】【大模型】大語言模型BLOOM推理工具測試
【自然語言處理】【大模型】GLM-130B:一個開源雙語預訓練語言模型
【自然語言處理】【大模型】用于大型Transformer的8-bit矩陣乘法介紹
【自然語言處理】【大模型】BLOOM:一個176B參數(shù)且可開放獲取的多語言模型

Collective通信操作及Pytorch示例

? 大模型時代,單機已經(jīng)無法完成先進模型的訓練和推理,分布式訓練和推理將會是必然的選擇。各類分布式訓練和推斷工具都會使用到Collective通信。網(wǎng)絡上大多數(shù)的教程僅簡單介紹這些操作的原理,沒有代碼示例來輔助理解。本文會介紹各類Collective通信操作,并展示pytorch中如何使用

一、Collective通信操作

1. AllReduce

? 將各個顯卡的張量進行聚合(sum、min、max)后,再將結(jié)果寫回至各個顯卡。

【深度學習】【分布式訓練】Collective通信操作及Pytorch示例

2. Broadcast

? 將張量從某張卡廣播至所有卡。

【深度學習】【分布式訓練】Collective通信操作及Pytorch示例

3. Reduce

? 執(zhí)行同AllReduce相同的操作,但結(jié)果僅寫入具有的某個顯卡。

【深度學習】【分布式訓練】Collective通信操作及Pytorch示例

4. AllGather

? 每個顯卡上有一個大小為N的張量,共有k個顯卡。經(jīng)過AllGather后將所有顯卡上的張量合并為一個 N × k N\times k N×k的張量,然后將結(jié)果分配至所有顯卡上。

【深度學習】【分布式訓練】Collective通信操作及Pytorch示例

5. ReduceScatter

? 執(zhí)行Reduce相同的操作,但是結(jié)果會被分散至不同的顯卡。

【深度學習】【分布式訓練】Collective通信操作及Pytorch示例

二、Pytorch示例

? pytorch的分布式包torch.distributed能夠方便的實現(xiàn)跨進程和跨機器集群的并行計算。本文代碼運行在單機雙卡服務器上,并基于下面的模板來執(zhí)行不同的分布式操作。

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def init_process(rank, size, fn, backend='nccl'):
    """
    為每個進程初始化分布式環(huán)境,保證相互之間可以通信,并調(diào)用函數(shù)fn。
    """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    
    
def run(world_size, func):
    """
    啟動world_size個進程,并執(zhí)行函數(shù)func。
    """
    processes = []
    mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, func))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
        
if __name__ == "__main__":
    run(2, func) # 這里的func隨后會被替換為不同的分布式示例函數(shù)
    pass

? 先對上面的模板做一些簡單的介紹。

  • 函數(shù)run會根據(jù)傳入的參數(shù)world_size,生成對應數(shù)量的進程。每個進程都會調(diào)用init_process來初始化分布式環(huán)境,并調(diào)用傳入的分布式示例函數(shù)。
  • torch.distributed.init_process_group(),該方法負責各進程之間的初始協(xié)調(diào),保證各進程都會與master進行握手。該方法在調(diào)用完成之前會一直阻塞,并且后續(xù)的所有操作都必須在該操作之后。調(diào)用該方法時需要初始化下面的4個環(huán)境變量:
    • MASTER_PORT:rank 0進程所在機器上的空閑端口;
    • MASTER_ADDR:rank 0進程所在機器上的IP地址;
    • WORLD_SIZE:進程總數(shù);
    • RANK:每個進程的RANK,所以每個進程知道其是否是master;

1. 點對點通信

? 在介紹其他collective通信之前,先看一個簡單的點對點通信實現(xiàn)。

def p2p_block_func(rank, size):
    """
    將rank src上的tensor發(fā)送至rank dst(阻塞)。
    """
    src = 0
    dst = 1
    group = dist.new_group(list(range(size)))
    # 對于rank src,該tensor用于發(fā)送
    # 對于rank dst,該tensor用于接收
    tensor = torch.zeros(1).to(torch.device("cuda", rank))
    if rank == src:
        tensor += 1
        # 發(fā)送tensor([1.])
        # group指定了該操作所見進程的范圍,默認情況下是整個world
        dist.send(tensor=tensor, dst=1, group=group)
    elif rank == dst:
        # rank dst的tensor初始化為tensor([0.]),但接收后為tensor([1.])
        dist.recv(tensor=tensor, src=0, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, p2p_block_func)

? p2p_block_func實現(xiàn)從rank 0發(fā)送一個tensor([1.0])至rank 1,該操作在發(fā)送完成/接收完成之前都會阻塞。

? 下面是一個不阻塞的版本:

def p2p_unblock_func(rank, size):
    """
    將rank src上的tensor發(fā)送至rank dst(非阻塞)。
    """
    src = 0
    dst = 1
    group = dist.new_group(list(range(size)))
    tensor = torch.zeros(1).to(torch.device("cuda", rank))
    if rank == src:
        tensor += 1
        # 非阻塞發(fā)送
        req = dist.isend(tensor=tensor, dst=dst, group=group)
        print("Rank 0 started sending")
    elif rank == dst:
        # 非阻塞接收
        req = dist.irecv(tensor=tensor, src=src, group=group)
        print("Rank 1 started receiving")
    req.wait()
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, p2p_unblock_func)

? p2p_unblock_func是非阻塞版本的點對點通信。使用非阻塞方法時,因為不知道數(shù)據(jù)何時送達,所以在req.wait()完成之前不要對發(fā)送/接收的tensor進行任何操作。

2. Broadcast

def broadcast_func(rank, size):
    src = 0
    group = dist.new_group(list(range(size)))
    if rank == src:
        # 對于rank src,初始化tensor([1.])
        tensor = torch.zeros(1).to(torch.device("cuda", rank)) + 1
    else:
        # 對于非rank src,初始化tensor([0.])
        tensor = torch.zeros(1).to(torch.device("cuda", rank))
    # 對于rank src,broadcast是發(fā)送;否則,則是接收
    dist.broadcast(tensor=tensor, src=0, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, broadcast_func)

? broadcast_func會將rank 0上的tensor([1.])廣播至所有的rank上。

3. Reduce與Allreduce

def reduce_func(rank, size):
    dst = 1
    group = dist.new_group(list(range(size)))
    tensor = torch.ones(1).to(torch.device("cuda", rank))
    # 對于所有rank都會發(fā)送, 但僅有dst會接收求和的結(jié)果
    dist.reduce(tensor, dst=dst, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, reduce_func)

? reduce_func會對group中所有rank的tensor進行聚合,并將結(jié)果發(fā)送至rank dst。

def allreduce_func(rank, size):
    group = dist.new_group(list(range(size)))
    tensor = torch.ones(1).to(torch.device("cuda", rank))
    # tensor即用來發(fā)送,也用來接收
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, allreduce_func)

? allreduce_func將group中所有rank的tensor進行聚合,并將結(jié)果發(fā)送至group中的所有rank。

4. Gather與Allgather

def gather_func(rank, size):
    dst = 1
    group = dist.new_group(list(range(size)))
    # 該tensor用于發(fā)送
    tensor = torch.zeros(1).to(torch.device("cuda", rank)) + rank
    gather_list = []
    if rank == dst:
        # gather_list中的tensor數(shù)量應該是size個,用于接收其他rank發(fā)送來的tensor
        gather_list = [torch.zeros(1).to(torch.device("cuda", dst)) for _ in range(size)]
        # 僅在rank dst上需要指定gather_list
        dist.gather(tensor, gather_list=gather_list, dst=dst, group=group)
    else:
        # 非rank dst,相當于發(fā)送tensor
        dist.gather(tensor, dst=dst, group=group)
    print('Rank ', rank, ' has data ', gather_list)
    
if __name__ == "__main__":
    run(2, gather_func)

? gather_func從group中所有rank上收集tensor,并發(fā)送至rank dst。(相當于不進行聚合操作的reduce)

def allgather_func(rank, size):
    group = dist.new_group(list(range(size)))
    # 該tensor用于發(fā)送
    tensor = torch.zeros(1).to(torch.device("cuda", rank)) + rank
    # gether_list用于接收各個rank發(fā)送來的tensor
    gather_list = [torch.zeros(1).to(torch.device("cuda", rank)) for _ in range(size)]
    dist.all_gather(gather_list, tensor, group=group)
    # 各個rank的gather_list均一致
    print('Rank ', rank, ' has data ', gather_list)
    
if __name__ == "__main__":
    run(2, allgather_func)

? allgather_func從group中所有rank上收集tensor,并將收集到的tensor發(fā)送至所有group中的rank。

5. Scatter與ReduceScatter

def scatter_func(rank, size):
    src = 0
    group = dist.new_group(list(range(size)))
    # 各個rank用于接收的tensor
    tensor = torch.empty(1).to(torch.device("cuda", rank))
    if rank == src:
        # 在rank src上,將tensor_list中的tensor分發(fā)至不同的rank上
        # tensor_list:[tensor([1.]), tensor([2.])]
        tensor_list = [torch.tensor([i + 1], dtype=torch.float32).to(torch.device("cuda", rank)) for i in range(size)]
        # 將tensor_list發(fā)送至各個rank
        # 接收屬于rank src的那部分tensor
        dist.scatter(tensor, scatter_list=tensor_list, src=0, group=group)
    else:
        # 接收屬于對應rank的tensor
        dist.scatter(tensor, scatter_list=[], src=0, group=group)
    # 每個rank都擁有tensor_list中的一部分tensor
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, scatter_func)

? scatter_func會將rank src中的一組tensor逐個分發(fā)至其他rank上,每個rank持有的tensor不同。

def reduce_scatter_func(rank, size):
    group = dist.new_group(list(range(size)))
    # 用于接收的tensor
    tensor = torch.empty(1).to(torch.device("cuda", rank))
    # 用于發(fā)送的tensor列表
    # 對于每個rank,有tensor_list=[tensor([0.]), tensor([1.])]
    tensor_list = [torch.Tensor([i]).to(torch.device("cuda", rank)) for i in range(size)]
    # step1. 經(jīng)過reduce的操作會得到tensor列表[tensor([0.]), tensor([2.])]
    # step2. tensor列表[tensor([0.]), tensor([2.])]分發(fā)至各個rank
    # rank 0得到tensor([0.]),rank 1得到tensor([2.])
    dist.reduce_scatter(tensor, tensor_list, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, reduce_scatter_func)

參考資料

https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/collectives.html

https://pytorch.org/tutorials/intermediate/dist_tuto.html#collective-communication

https://pytorch.org/docs/stable/distributed.html#collective-functions文章來源地址http://www.zghlxwxcb.cn/news/detail-412125.html

到了這里,關于【深度學習】【分布式訓練】Collective通信操作及Pytorch示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 機器學習洞察 | 分布式訓練讓機器學習更加快速準確

    機器學習洞察 | 分布式訓練讓機器學習更加快速準確

    機器學習能夠基于數(shù)據(jù)發(fā)現(xiàn)一般化規(guī)律的優(yōu)勢日益突顯,我們看到有越來越多的開發(fā)者關注如何訓練出更快速、更準確的機器學習模型,而分布式訓練 (Distributed Training) 則能夠大幅加速這一進程。 亞馬遜云科技開發(fā)者社區(qū)為開發(fā)者們提供全球的開發(fā)技術資源。這里有技術文檔

    2024年02月16日
    瀏覽(24)
  • 1、pytorch分布式數(shù)據(jù)訓練結(jié)合學習率周期及混合精度

    正如標題所寫,我們正常的普通訓練都是單機單卡或單機多卡。而往往一個高精度的模型需要訓練時間很長,所以DDP分布式數(shù)據(jù)并行和混合精度可以加速模型訓練。混精可以增大batch size. 如下提供示例代碼,經(jīng)過官網(wǎng)查閱驗證的。原始代碼由百度文心一言提供。 問題:pytor

    2024年02月07日
    瀏覽(28)
  • 分布式深度學習庫BigDL簡述

    ????????BigDL是一個在Apache Spark上構建的分布式深度學習庫,由英特爾開發(fā)并開源。它允許用戶使用Scala或Python語言在大規(guī)模數(shù)據(jù)集上進行深度學習模型的訓練和推理。BigDL提供了許多常見的深度學習模型和算法的實現(xiàn),包括卷積神經(jīng)網(wǎng)絡(CNN)、循環(huán)神經(jīng)網(wǎng)絡(RNN)等。由

    2024年04月10日
    瀏覽(17)
  • 分布式深度學習中的數(shù)據(jù)并行和模型并行

    分布式深度學習中的數(shù)據(jù)并行和模型并行

    ??個人主頁: https://zhangxiaoshu.blog.csdn.net ??歡迎大家:關注??+點贊??+評論??+收藏??,如有錯誤敬請指正! ??未來很長,值得我們?nèi)Ρ几案篮玫纳睿?對于深度學習模型的預訓練階段,海量的訓練數(shù)據(jù)、超大規(guī)模的模型給深度學習帶來了日益嚴峻的挑戰(zhàn),因此,經(jīng)

    2024年01月24日
    瀏覽(27)
  • AI框架:9大主流分布式深度學習框架簡介

    AI框架:9大主流分布式深度學習框架簡介

    轉(zhuǎn)載翻譯Medium上一篇關于分布式深度學習框架的文章 https://medium.com/@mlblogging.k/9-libraries-for-parallel-distributed-training-inference-of-deep-learning-models-5faa86199c1fmedium.com/@mlblogging.k/9-libraries-for-parallel-distributed-training-inference-of-deep-learning-models-5faa86199c1f 大型深度學習模型在訓練時需要大量內(nèi)

    2024年02月09日
    瀏覽(59)
  • 【分布式訓練】基于Pytorch的分布式數(shù)據(jù)并行訓練

    【分布式訓練】基于Pytorch的分布式數(shù)據(jù)并行訓練

    簡介: 在PyTorch中使用DistributedDataParallel進行多GPU分布式模型訓練 加速神經(jīng)網(wǎng)絡訓練的最簡單方法是使用GPU,它在神經(jīng)網(wǎng)絡中常見的計算類型(矩陣乘法和加法)上提供了比CPU更大的加速。隨著模型或數(shù)據(jù)集變得越來越大,一個GPU很快就會變得不足。例如,像BERT和GPT-2這樣的

    2024年02月17日
    瀏覽(32)
  • 【分布式訓練】基于PyTorch進行多GPU分布式模型訓練(補充)

    【分布式訓練】基于PyTorch進行多GPU分布式模型訓練(補充)

    簡介: 在PyTorch中使用DistributedDataParallel進行多GPU分布式模型訓練。 原文鏈接:https://towardsdatascience.com/distributed-model-training-in-pytorch-using-distributeddataparallel-d3d3864dc2a7 隨著以ChatGPT為代表的大模型的不斷涌現(xiàn),如何在合理的時間內(nèi)訓練大模型逐漸成為一個重要的研究課題。為了解

    2024年02月16日
    瀏覽(27)
  • 【分布式】大模型分布式訓練入門與實踐 - 04

    【分布式】NCCL部署與測試 - 01 【分布式】入門級NCCL多機并行實踐 - 02 【分布式】小白看Ring算法 - 03 【分布式】大模型分布式訓練入門與實踐 - 04 數(shù)據(jù)并行(Distributed Data Parallel)是一種用于加快深度學習模型訓練速度的技術。在過去,訓練大型模型往往受限于單卡訓練的瓶頸

    2024年02月08日
    瀏覽(70)
  • pytorch 分布式訓練

    分布式訓練分為這幾類: 按照并行方式來分:模型并行 vs 數(shù)據(jù)并行 按照更新方式來分:同步更新 vs 異步更新 按照算法來分:Parameter Server算法 vs AllReduce算法 這個函數(shù)主要有三個參數(shù): module:即模型,此處注意,雖然輸入數(shù)據(jù)被均分到不同gpu上,但每個gpu上都要拷貝一份模

    2024年02月12日
    瀏覽(25)
  • 大語言模型的分布式訓練

    什么是大語言模型 訓練方式 面臨的挑戰(zhàn) 什么是分布式計算 如何實現(xiàn) 拆分邏輯 分發(fā)邏輯 大語言模型的分布式訓練 數(shù)據(jù)并行 模型并行 流水線并行 張量并行 通信 PS NCCL是Nvidia Collective multi-GPU Communication Library的簡稱,它是一個實現(xiàn)多GPU的collective communication通信(all-gather, red

    2024年02月10日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包