PyTorch并行訓(xùn)練DistributedDataParallel完整demo
大型數(shù)據(jù)集訓(xùn)練
使用大型數(shù)據(jù)集訓(xùn)練大型深度神經(jīng)網(wǎng)絡(luò) (DNN) 的問題是深度學(xué)習(xí)領(lǐng)域的主要挑戰(zhàn)。 隨著 DNN 和數(shù)據(jù)集規(guī)模的增加,訓(xùn)練這些模型的計算和內(nèi)存需求也會增加。 這使得在計算資源有限的單臺機器上訓(xùn)練這些模型變得困難甚至不可能。
使用大型數(shù)據(jù)集訓(xùn)練大型 DNN 的一些主要挑戰(zhàn)包括:
- 訓(xùn)練時間長:訓(xùn)練過程可能需要數(shù)周甚至數(shù)月才能完成,具體取決于模型的復(fù)雜性和數(shù)據(jù)集的大小。
- 內(nèi)存限制:大型 DNN 可能需要大量內(nèi)存來存儲訓(xùn)練期間的所有模型參數(shù)、梯度和中間激活。 這可能會導(dǎo)致內(nèi)存不足錯誤并限制可在單臺機器上訓(xùn)練的模型的大小。
為了應(yīng)對這些挑戰(zhàn),已經(jīng)開發(fā)了各種技術(shù)來擴大具有大型數(shù)據(jù)集的大型 DNN 的訓(xùn)練,包括模型并行性、數(shù)據(jù)并行性和混合并行性,以及硬件、軟件和算法的優(yōu)化。
PyTorch 的數(shù)據(jù)并行性和模型并行性
在本文中我們將演示使用 PyTorch 的數(shù)據(jù)并行性和模型并行性。
我們所說的并行性一般是指在多個gpu,或多臺機器上訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)(dnn),以實現(xiàn)更少的訓(xùn)練時間。數(shù)據(jù)并行背后的基本思想是將訓(xùn)練數(shù)據(jù)分成更小的塊,讓每個GPU或機器處理一個單獨的數(shù)據(jù)塊。然后將每個節(jié)點的結(jié)果組合起來,用于更新模型參數(shù)。在數(shù)據(jù)并行中,模型體系結(jié)構(gòu)在每個節(jié)點上是相同的,但模型參數(shù)在節(jié)點之間進行了分區(qū)。每個節(jié)點使用分配的數(shù)據(jù)塊訓(xùn)練自己的本地模型,在每次訓(xùn)練迭代結(jié)束時,模型參數(shù)在所有節(jié)點之間同步。這個過程不斷重復(fù),直到模型收斂到一個令人滿意的結(jié)果。
下面我們用用ResNet50和CIFAR10數(shù)據(jù)集來進行完整的代碼示例:
在數(shù)據(jù)并行中,模型架構(gòu)在每個節(jié)點上保持相同,但模型參數(shù)在節(jié)點之間進行了分區(qū),每個節(jié)點使用分配的數(shù)據(jù)塊訓(xùn)練自己的本地模型。
PyTorch的DistributedDataParallel 庫可以進行跨節(jié)點的梯度和模型參數(shù)的高效通信和同步,實現(xiàn)分布式訓(xùn)練。本文提供了如何使用ResNet50和CIFAR10數(shù)據(jù)集使用PyTorch實現(xiàn)數(shù)據(jù)并行的示例,其中代碼在多個gpu或機器上運行,每臺機器處理訓(xùn)練數(shù)據(jù)的一個子集。訓(xùn)練過程使用PyTorch的DistributedDataParallel 庫進行并行化。
導(dǎo)入必須要的庫
importos fromdatetimeimportdatetime fromtimeimporttime importargparse importtorchvision importtorchvision.transformsastransforms importtorch importtorch.nnasnn importtorch.distributedasdist fromtorch.nn.parallelimportDistributedDataParallel
檢查GPU
importsubprocess result=subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE) print(result.stdout.decode())
因為我們需要在多個服務(wù)器上運行,所以手動一個一個執(zhí)行并不現(xiàn)實,所以需要有一個調(diào)度程序。這里我們使用SLURM文件來運行代碼(slurm面向Linux和Unix類似內(nèi)核的免費和開源工作調(diào)度程序),
defmain(): # get distributed configuration from Slurm environment parser=argparse.ArgumentParser() parser.add_argument('-b', '--batch-size', default=128, type=int, help='batch size. it will be divided in mini-batch for each worker') parser.add_argument('-e','--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') parser.add_argument('-c','--checkpoint', default=None, type=str, help='path to checkpoint to load') args=parser.parse_args() rank=int(os.environ['SLURM_PROCID']) local_rank=int(os.environ['SLURM_LOCALID']) size=int(os.environ['SLURM_NTASKS']) master_addr=os.environ["SLURM_SRUN_COMM_HOST"] port="29500" node_id=os.environ['SLURM_NODEID'] ddp_arg= [rank, local_rank, size, master_addr, port, node_id] train(args, ddp_arg)
然后我們使用DistributedDataParallel 庫來執(zhí)行分布式訓(xùn)練。
deftrain(args, ddp_arg): rank, local_rank, size, MASTER_ADDR, port, NODE_ID=ddp_arg # display info ifrank==0: #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR) print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR) #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID)) print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID)) # configure distribution method: define address and port of the master node and initialise communication backend (NCCL) #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank) dist.init_process_group( backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank ) # distribute model torch.cuda.set_device(local_rank) gpu=torch.device("cuda") #model = ResNet18(classes=10).to(gpu) model=torchvision.models.resnet50(pretrained=False).to(gpu) ddp_model=DistributedDataParallel(model, device_ids=[local_rank]) ifargs.checkpointisnotNone: map_location= {'cuda:%d'%0: 'cuda:%d'%local_rank} ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location)) # distribute batch size (mini-batch) batch_size=args.batch_size batch_size_per_gpu=batch_size//size # define loss function (criterion) and optimizer criterion=nn.CrossEntropyLoss() optimizer=torch.optim.SGD(ddp_model.parameters(), 1e-4) transform_train=transforms.Compose([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), ]) # load data with distributed sampler #train_dataset = torchvision.datasets.CIFAR10(root='./data', # train=True, # transform=transform_train, # download=False) # load data with distributed sampler train_dataset=torchvision.datasets.CIFAR10(root='./data', train=True, transform=transform_train, download=False) train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=size, rank=rank) train_loader=torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size_per_gpu, shuffle=False, num_workers=0, pin_memory=True, sampler=train_sampler) # training (timers and display handled by process 0) ifrank==0: start=datetime.now() total_step=len(train_loader) forepochinrange(args.epochs): ifrank==0: start_dataload=time() fori, (images, labels) inenumerate(train_loader): # distribution of images and labels to all GPUs images=images.to(gpu, non_blocking=True) labels=labels.to(gpu, non_blocking=True) ifrank==0: stop_dataload=time() ifrank==0: start_training=time() # forward pass outputs=ddp_model(images) loss=criterion(outputs, labels) # backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() ifrank==0: stop_training=time() if (i+1) %10==0andrank==0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch+1, args.epochs, i+1, total_step, loss.item(), (stop_dataload-start_dataload)*1000, (stop_training-start_training)*1000)) ifrank==0: start_dataload=time() #Save checkpoint at every end of epoch ifrank==0: torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1)) ifrank==0: print(">>> Training complete in: "+str(datetime.now() -start)) if__name__=='__main__': main()
代碼將數(shù)據(jù)和模型分割到多個gpu上,并以分布式的方式更新模型。
代碼解釋
train(args, ddp_arg)有兩個參數(shù),args和ddp_arg,其中args是傳遞給腳本的命令行參數(shù),ddp_arg包含分布式訓(xùn)練相關(guān)參數(shù)。
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式訓(xùn)練相關(guān)參數(shù)。
如果rank為0,則打印當(dāng)前使用的gpu數(shù)量和主節(jié)點IP地址信息。
dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式進程組。
torch.cuda.set_device(local_rank):為這個進程選擇指定的GPU。
model = torchvision.models. ResNet50 (pretrained=False).to(gpu):從torchvision模型中加載ResNet50模型,并將其移動到指定的gpu。
ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):將模型包裝在DistributedDataParallel模塊中,也就是說這樣我們就可以進行分布式訓(xùn)練了
加載CIFAR-10數(shù)據(jù)集并應(yīng)用數(shù)據(jù)增強轉(zhuǎn)換。
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):創(chuàng)建一個DistributedSampler對象,將數(shù)據(jù)集分割到多個gpu上。
train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):創(chuàng)建一個DataLoader對象,數(shù)據(jù)將批量加載到模型中,這與我們平常訓(xùn)練的步驟是一致的只不過是增加了一個分布式的數(shù)據(jù)采樣DistributedSampler
為指定的epoch數(shù)訓(xùn)練模型,以分布式的方式使用optimizer.step()更新權(quán)重。
rank0在每個輪次結(jié)束時保存一個檢查點。
rank0每10個批次顯示損失和訓(xùn)練時間。
結(jié)束訓(xùn)練時打印訓(xùn)練模型所花費的總時間也是在rank0上。
代碼測試
在使用1個節(jié)點1/2/3/4個gpu, 2個節(jié)點6/8個gpu,每個節(jié)點3/4個gpu上進行了訓(xùn)練Cifar10上的Resnet50的測試如下圖所示,每次測試的批處理大小保持不變。完成每項測試所花費的時間以秒為單位記錄。隨著使用的gpu數(shù)量的增加,完成測試所需的時間會減少。當(dāng)使用8個gpu時,需要320秒才能完成,這是記錄中最快的時間。這是肯定的,但是我們可以看到訓(xùn)練的速度并沒有像GPU數(shù)量增長呈現(xiàn)線性的增長,這可能是因為Resnet50算是一個比較小的模型了,并不需要進行并行化訓(xùn)練。
在多個gpu上使用數(shù)據(jù)并行可以顯著減少在給定數(shù)據(jù)集上訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)(DNN)所需的時間。隨著gpu數(shù)量的增加,完成訓(xùn)練過程所需的時間減少,這表明DNN可以更有效地并行訓(xùn)練。
這種方法在處理大型數(shù)據(jù)集或復(fù)雜的DNN架構(gòu)時特別有用。通過利用多個gpu,可以加快訓(xùn)練過程,實現(xiàn)更快的模型迭代和實驗。但是需要注意的是,通過Data Parallelism實現(xiàn)的性能提升可能會受到通信開銷和GPU內(nèi)存限制等因素的限制,需要仔細(xì)調(diào)優(yōu)才能獲得最佳結(jié)果。
以上就是PyTorch并行訓(xùn)練DistributedDataParallel完整demo的詳細(xì)內(nèi)容,更多關(guān)于PyTorch并行訓(xùn)練的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情
這篇文章主要介紹了Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情,文章通過簡單輸出到cmd和文件中的代碼展開詳情,需要的朋友可以參考一下2022-07-07python詞云庫wordCloud使用方法詳解(解決中文亂碼)
這篇文章主要介紹了python詞云庫wordCloud使用方法詳解(解決中文亂碼),需要的朋友可以參考下2020-02-02