欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PyTorch并行訓(xùn)練DistributedDataParallel完整demo

 更新時間:2023年06月01日 14:43:52   作者:Joseph El Kettaneh  
這篇文章主要為大家介紹了PyTorch并行訓(xùn)練DistributedDataParallel完整demo,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

大型數(shù)據(jù)集訓(xùn)練

使用大型數(shù)據(jù)集訓(xùn)練大型深度神經(jīng)網(wǎng)絡(luò) (DNN) 的問題是深度學(xué)習(xí)領(lǐng)域的主要挑戰(zhàn)。 隨著 DNN 和數(shù)據(jù)集規(guī)模的增加,訓(xùn)練這些模型的計算和內(nèi)存需求也會增加。 這使得在計算資源有限的單臺機器上訓(xùn)練這些模型變得困難甚至不可能。

使用大型數(shù)據(jù)集訓(xùn)練大型 DNN 的一些主要挑戰(zhàn)包括:

  • 訓(xùn)練時間長:訓(xùn)練過程可能需要數(shù)周甚至數(shù)月才能完成,具體取決于模型的復(fù)雜性和數(shù)據(jù)集的大小。
  • 內(nèi)存限制:大型 DNN 可能需要大量內(nèi)存來存儲訓(xùn)練期間的所有模型參數(shù)、梯度和中間激活。 這可能會導(dǎo)致內(nèi)存不足錯誤并限制可在單臺機器上訓(xùn)練的模型的大小。

為了應(yīng)對這些挑戰(zhàn),已經(jīng)開發(fā)了各種技術(shù)來擴大具有大型數(shù)據(jù)集的大型 DNN 的訓(xùn)練,包括模型并行性、數(shù)據(jù)并行性和混合并行性,以及硬件、軟件和算法的優(yōu)化。

PyTorch 的數(shù)據(jù)并行性和模型并行性

在本文中我們將演示使用 PyTorch 的數(shù)據(jù)并行性和模型并行性。

我們所說的并行性一般是指在多個gpu,或多臺機器上訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)(dnn),以實現(xiàn)更少的訓(xùn)練時間。數(shù)據(jù)并行背后的基本思想是將訓(xùn)練數(shù)據(jù)分成更小的塊,讓每個GPU或機器處理一個單獨的數(shù)據(jù)塊。然后將每個節(jié)點的結(jié)果組合起來,用于更新模型參數(shù)。在數(shù)據(jù)并行中,模型體系結(jié)構(gòu)在每個節(jié)點上是相同的,但模型參數(shù)在節(jié)點之間進行了分區(qū)。每個節(jié)點使用分配的數(shù)據(jù)塊訓(xùn)練自己的本地模型,在每次訓(xùn)練迭代結(jié)束時,模型參數(shù)在所有節(jié)點之間同步。這個過程不斷重復(fù),直到模型收斂到一個令人滿意的結(jié)果。

下面我們用用ResNet50和CIFAR10數(shù)據(jù)集來進行完整的代碼示例:

在數(shù)據(jù)并行中,模型架構(gòu)在每個節(jié)點上保持相同,但模型參數(shù)在節(jié)點之間進行了分區(qū),每個節(jié)點使用分配的數(shù)據(jù)塊訓(xùn)練自己的本地模型。

PyTorch的DistributedDataParallel 庫可以進行跨節(jié)點的梯度和模型參數(shù)的高效通信和同步,實現(xiàn)分布式訓(xùn)練。本文提供了如何使用ResNet50和CIFAR10數(shù)據(jù)集使用PyTorch實現(xiàn)數(shù)據(jù)并行的示例,其中代碼在多個gpu或機器上運行,每臺機器處理訓(xùn)練數(shù)據(jù)的一個子集。訓(xùn)練過程使用PyTorch的DistributedDataParallel 庫進行并行化。

導(dǎo)入必須要的庫

importos
 fromdatetimeimportdatetime
 fromtimeimporttime
 importargparse
 importtorchvision
 importtorchvision.transformsastransforms
 importtorch
 importtorch.nnasnn
 importtorch.distributedasdist
 fromtorch.nn.parallelimportDistributedDataParallel

檢查GPU

importsubprocess
 result=subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())

因為我們需要在多個服務(wù)器上運行,所以手動一個一個執(zhí)行并不現(xiàn)實,所以需要有一個調(diào)度程序。這里我們使用SLURM文件來運行代碼(slurm面向Linux和Unix類似內(nèi)核的免費和開源工作調(diào)度程序),

defmain():
     # get distributed configuration from Slurm environment
     parser=argparse.ArgumentParser()
     parser.add_argument('-b', '--batch-size', default=128, type=int,
                         help='batch size. it will be divided in mini-batch for each worker')
     parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
                         help='number of total epochs to run')
     parser.add_argument('-c','--checkpoint', default=None, type=str,
                         help='path to checkpoint to load')
     args=parser.parse_args()
     rank=int(os.environ['SLURM_PROCID'])
     local_rank=int(os.environ['SLURM_LOCALID'])
     size=int(os.environ['SLURM_NTASKS'])
     master_addr=os.environ["SLURM_SRUN_COMM_HOST"]
     port="29500"
     node_id=os.environ['SLURM_NODEID']
     ddp_arg= [rank, local_rank, size, master_addr, port, node_id]
     train(args, ddp_arg)

然后我們使用DistributedDataParallel 庫來執(zhí)行分布式訓(xùn)練。

deftrain(args, ddp_arg):
     rank, local_rank, size, MASTER_ADDR, port, NODE_ID=ddp_arg
     # display info
     ifrank==0:
         #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
         print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
     #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
     print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
     # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
     #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
     dist.init_process_group(
         backend='nccl',
         init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
         world_size=size,
         rank=rank
     )
     # distribute model
     torch.cuda.set_device(local_rank)
     gpu=torch.device("cuda")
     #model = ResNet18(classes=10).to(gpu)
     model=torchvision.models.resnet50(pretrained=False).to(gpu)
     ddp_model=DistributedDataParallel(model, device_ids=[local_rank])
     ifargs.checkpointisnotNone:
         map_location= {'cuda:%d'%0: 'cuda:%d'%local_rank}
         ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
     # distribute batch size (mini-batch)
     batch_size=args.batch_size
     batch_size_per_gpu=batch_size//size
     # define loss function (criterion) and optimizer
     criterion=nn.CrossEntropyLoss()  
     optimizer=torch.optim.SGD(ddp_model.parameters(), 1e-4)
     transform_train=transforms.Compose([
         transforms.RandomCrop(32, padding=4),
         transforms.RandomHorizontalFlip(),
         transforms.ToTensor(),
         transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
     ])
     # load data with distributed sampler
     #train_dataset = torchvision.datasets.CIFAR10(root='./data',
     #                                           train=True,
     #                                           transform=transform_train,
     #                                           download=False)
     # load data with distributed sampler
     train_dataset=torchvision.datasets.CIFAR10(root='./data',
                                                train=True,
                                                transform=transform_train,
                                                download=False)
     train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                     num_replicas=size,
                                                                     rank=rank)
     train_loader=torch.utils.data.DataLoader(dataset=train_dataset,
                                                batch_size=batch_size_per_gpu,
                                                shuffle=False,
                                                num_workers=0,
                                                pin_memory=True,
                                                sampler=train_sampler)
     # training (timers and display handled by process 0)
     ifrank==0: start=datetime.now()         
     total_step=len(train_loader)
     forepochinrange(args.epochs):
         ifrank==0: start_dataload=time()
         fori, (images, labels) inenumerate(train_loader):
             # distribution of images and labels to all GPUs
             images=images.to(gpu, non_blocking=True)
             labels=labels.to(gpu, non_blocking=True) 
             ifrank==0: stop_dataload=time()
             ifrank==0: start_training=time()
             # forward pass
             outputs=ddp_model(images)
             loss=criterion(outputs, labels)
             # backward and optimize
             optimizer.zero_grad()
             loss.backward()
             optimizer.step()
             ifrank==0: stop_training=time() 
             if (i+1) %10==0andrank==0:
                 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch+1, args.epochs,
                                                                         i+1, total_step, loss.item(), (stop_dataload-start_dataload)*1000,
                                                                         (stop_training-start_training)*1000))
             ifrank==0: start_dataload=time()
         #Save checkpoint at every end of epoch
         ifrank==0:
             torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
     ifrank==0:
         print(">>> Training complete in: "+str(datetime.now() -start))
 if__name__=='__main__':
     main()

代碼將數(shù)據(jù)和模型分割到多個gpu上,并以分布式的方式更新模型。

代碼解釋

train(args, ddp_arg)有兩個參數(shù),args和ddp_arg,其中args是傳遞給腳本的命令行參數(shù),ddp_arg包含分布式訓(xùn)練相關(guān)參數(shù)。

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式訓(xùn)練相關(guān)參數(shù)。

如果rank為0,則打印當(dāng)前使用的gpu數(shù)量和主節(jié)點IP地址信息。

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式進程組。

torch.cuda.set_device(local_rank):為這個進程選擇指定的GPU。

model = torchvision.models. ResNet50 (pretrained=False).to(gpu):從torchvision模型中加載ResNet50模型,并將其移動到指定的gpu。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):將模型包裝在DistributedDataParallel模塊中,也就是說這樣我們就可以進行分布式訓(xùn)練了

加載CIFAR-10數(shù)據(jù)集并應(yīng)用數(shù)據(jù)增強轉(zhuǎn)換。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):創(chuàng)建一個DistributedSampler對象,將數(shù)據(jù)集分割到多個gpu上。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):創(chuàng)建一個DataLoader對象,數(shù)據(jù)將批量加載到模型中,這與我們平常訓(xùn)練的步驟是一致的只不過是增加了一個分布式的數(shù)據(jù)采樣DistributedSampler

為指定的epoch數(shù)訓(xùn)練模型,以分布式的方式使用optimizer.step()更新權(quán)重。

rank0在每個輪次結(jié)束時保存一個檢查點。

rank0每10個批次顯示損失和訓(xùn)練時間。

結(jié)束訓(xùn)練時打印訓(xùn)練模型所花費的總時間也是在rank0上。

代碼測試

在使用1個節(jié)點1/2/3/4個gpu, 2個節(jié)點6/8個gpu,每個節(jié)點3/4個gpu上進行了訓(xùn)練Cifar10上的Resnet50的測試如下圖所示,每次測試的批處理大小保持不變。完成每項測試所花費的時間以秒為單位記錄。隨著使用的gpu數(shù)量的增加,完成測試所需的時間會減少。當(dāng)使用8個gpu時,需要320秒才能完成,這是記錄中最快的時間。這是肯定的,但是我們可以看到訓(xùn)練的速度并沒有像GPU數(shù)量增長呈現(xiàn)線性的增長,這可能是因為Resnet50算是一個比較小的模型了,并不需要進行并行化訓(xùn)練。

在多個gpu上使用數(shù)據(jù)并行可以顯著減少在給定數(shù)據(jù)集上訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)(DNN)所需的時間。隨著gpu數(shù)量的增加,完成訓(xùn)練過程所需的時間減少,這表明DNN可以更有效地并行訓(xùn)練。

這種方法在處理大型數(shù)據(jù)集或復(fù)雜的DNN架構(gòu)時特別有用。通過利用多個gpu,可以加快訓(xùn)練過程,實現(xiàn)更快的模型迭代和實驗。但是需要注意的是,通過Data Parallelism實現(xiàn)的性能提升可能會受到通信開銷和GPU內(nèi)存限制等因素的限制,需要仔細(xì)調(diào)優(yōu)才能獲得最佳結(jié)果。

以上就是PyTorch并行訓(xùn)練DistributedDataParallel完整demo的詳細(xì)內(nèi)容,更多關(guān)于PyTorch并行訓(xùn)練的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • numpy.delete刪除一列或多列的方法

    numpy.delete刪除一列或多列的方法

    下面小編就為大家分享一篇numpy.delete刪除一列或多列的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-04-04
  • 使用python實現(xiàn)knn算法

    使用python實現(xiàn)knn算法

    這篇文章主要為大家詳細(xì)介紹了使用python實現(xiàn)knn算法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-12-12
  • 通過Python繪制中國結(jié)的示例代碼

    通過Python繪制中國結(jié)的示例代碼

    再過不久就要到新年了,所以這篇文章將為大家介紹一下如何通過Python代碼繪制一個中國結(jié),文中的示例代碼講解詳細(xì),感興趣的可以動手試一試
    2022-01-01
  • Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情

    Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情

    這篇文章主要介紹了Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情,文章通過簡單輸出到cmd和文件中的代碼展開詳情,需要的朋友可以參考一下
    2022-07-07
  • python詞云庫wordCloud使用方法詳解(解決中文亂碼)

    python詞云庫wordCloud使用方法詳解(解決中文亂碼)

    這篇文章主要介紹了python詞云庫wordCloud使用方法詳解(解決中文亂碼),需要的朋友可以參考下
    2020-02-02
  • python3 判斷列表是一個空列表的方法

    python3 判斷列表是一個空列表的方法

    今天小編就為大家分享一篇python3 判斷列表是一個空列表的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-05-05
  • python實現(xiàn)雙人貪吃蛇小游戲

    python實現(xiàn)雙人貪吃蛇小游戲

    這篇文章主要為大家詳細(xì)介紹了python實現(xiàn)雙人貪吃蛇小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • 用python完成一個分布式事務(wù)TCC

    用python完成一個分布式事務(wù)TCC

    這篇文章主要介紹了用python完成一個分布式事務(wù)TCC,文章里我們介紹了TCC的理論知識,也通過一個例子,完整給出了編寫一個TCC事務(wù)的過程,涵蓋了正常成功完成,以及成功回滾的情況,需要的朋友可以參考一下文章的具體內(nèi)容
    2021-10-10
  • python?自定義裝飾器使用及原理詳解(最新推薦)

    python?自定義裝飾器使用及原理詳解(最新推薦)

    本文詳細(xì)介紹了Python裝飾器的原理和使用方法,包括簡單的裝飾器、帶參數(shù)的裝飾器、原函數(shù)的傳參、保留原函數(shù)元數(shù)據(jù)以及類裝飾器,通過這些講解,讀者可以全面了解裝飾器的強大功能和應(yīng)用技巧,感興趣的朋友一起看看吧
    2025-02-02
  • wxpython繪制圓角窗體

    wxpython繪制圓角窗體

    這篇文章主要為大家詳細(xì)介紹了wxpython繪制圓角窗體,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-11-11

最新評論