PyTorch并行訓(xùn)練DistributedDataParallel完整demo
大型數(shù)據(jù)集訓(xùn)練
使用大型數(shù)據(jù)集訓(xùn)練大型深度神經(jīng)網(wǎng)絡(luò) (DNN) 的問題是深度學(xué)習(xí)領(lǐng)域的主要挑戰(zhàn)。 隨著 DNN 和數(shù)據(jù)集規(guī)模的增加,訓(xùn)練這些模型的計(jì)算和內(nèi)存需求也會(huì)增加。 這使得在計(jì)算資源有限的單臺機(jī)器上訓(xùn)練這些模型變得困難甚至不可能。
使用大型數(shù)據(jù)集訓(xùn)練大型 DNN 的一些主要挑戰(zhàn)包括:
- 訓(xùn)練時(shí)間長:訓(xùn)練過程可能需要數(shù)周甚至數(shù)月才能完成,具體取決于模型的復(fù)雜性和數(shù)據(jù)集的大小。
- 內(nèi)存限制:大型 DNN 可能需要大量內(nèi)存來存儲訓(xùn)練期間的所有模型參數(shù)、梯度和中間激活。 這可能會(huì)導(dǎo)致內(nèi)存不足錯(cuò)誤并限制可在單臺機(jī)器上訓(xùn)練的模型的大小。
為了應(yīng)對這些挑戰(zhàn),已經(jīng)開發(fā)了各種技術(shù)來擴(kuò)大具有大型數(shù)據(jù)集的大型 DNN 的訓(xùn)練,包括模型并行性、數(shù)據(jù)并行性和混合并行性,以及硬件、軟件和算法的優(yōu)化。
PyTorch 的數(shù)據(jù)并行性和模型并行性
在本文中我們將演示使用 PyTorch 的數(shù)據(jù)并行性和模型并行性。

我們所說的并行性一般是指在多個(gè)gpu,或多臺機(jī)器上訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)(dnn),以實(shí)現(xiàn)更少的訓(xùn)練時(shí)間。數(shù)據(jù)并行背后的基本思想是將訓(xùn)練數(shù)據(jù)分成更小的塊,讓每個(gè)GPU或機(jī)器處理一個(gè)單獨(dú)的數(shù)據(jù)塊。然后將每個(gè)節(jié)點(diǎn)的結(jié)果組合起來,用于更新模型參數(shù)。在數(shù)據(jù)并行中,模型體系結(jié)構(gòu)在每個(gè)節(jié)點(diǎn)上是相同的,但模型參數(shù)在節(jié)點(diǎn)之間進(jìn)行了分區(qū)。每個(gè)節(jié)點(diǎn)使用分配的數(shù)據(jù)塊訓(xùn)練自己的本地模型,在每次訓(xùn)練迭代結(jié)束時(shí),模型參數(shù)在所有節(jié)點(diǎn)之間同步。這個(gè)過程不斷重復(fù),直到模型收斂到一個(gè)令人滿意的結(jié)果。
下面我們用用ResNet50和CIFAR10數(shù)據(jù)集來進(jìn)行完整的代碼示例:
在數(shù)據(jù)并行中,模型架構(gòu)在每個(gè)節(jié)點(diǎn)上保持相同,但模型參數(shù)在節(jié)點(diǎn)之間進(jìn)行了分區(qū),每個(gè)節(jié)點(diǎn)使用分配的數(shù)據(jù)塊訓(xùn)練自己的本地模型。
PyTorch的DistributedDataParallel 庫可以進(jìn)行跨節(jié)點(diǎn)的梯度和模型參數(shù)的高效通信和同步,實(shí)現(xiàn)分布式訓(xùn)練。本文提供了如何使用ResNet50和CIFAR10數(shù)據(jù)集使用PyTorch實(shí)現(xiàn)數(shù)據(jù)并行的示例,其中代碼在多個(gè)gpu或機(jī)器上運(yùn)行,每臺機(jī)器處理訓(xùn)練數(shù)據(jù)的一個(gè)子集。訓(xùn)練過程使用PyTorch的DistributedDataParallel 庫進(jìn)行并行化。
導(dǎo)入必須要的庫
importos fromdatetimeimportdatetime fromtimeimporttime importargparse importtorchvision importtorchvision.transformsastransforms importtorch importtorch.nnasnn importtorch.distributedasdist fromtorch.nn.parallelimportDistributedDataParallel
檢查GPU
importsubprocess result=subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE) print(result.stdout.decode())
因?yàn)槲覀冃枰诙鄠€(gè)服務(wù)器上運(yùn)行,所以手動(dòng)一個(gè)一個(gè)執(zhí)行并不現(xiàn)實(shí),所以需要有一個(gè)調(diào)度程序。這里我們使用SLURM文件來運(yùn)行代碼(slurm面向Linux和Unix類似內(nèi)核的免費(fèi)和開源工作調(diào)度程序),
defmain():
# get distributed configuration from Slurm environment
parser=argparse.ArgumentParser()
parser.add_argument('-b', '--batch-size', default=128, type=int,
help='batch size. it will be divided in mini-batch for each worker')
parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('-c','--checkpoint', default=None, type=str,
help='path to checkpoint to load')
args=parser.parse_args()
rank=int(os.environ['SLURM_PROCID'])
local_rank=int(os.environ['SLURM_LOCALID'])
size=int(os.environ['SLURM_NTASKS'])
master_addr=os.environ["SLURM_SRUN_COMM_HOST"]
port="29500"
node_id=os.environ['SLURM_NODEID']
ddp_arg= [rank, local_rank, size, master_addr, port, node_id]
train(args, ddp_arg)然后我們使用DistributedDataParallel 庫來執(zhí)行分布式訓(xùn)練。
deftrain(args, ddp_arg):
rank, local_rank, size, MASTER_ADDR, port, NODE_ID=ddp_arg
# display info
ifrank==0:
#print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
#print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
# configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
#dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
dist.init_process_group(
backend='nccl',
init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
world_size=size,
rank=rank
)
# distribute model
torch.cuda.set_device(local_rank)
gpu=torch.device("cuda")
#model = ResNet18(classes=10).to(gpu)
model=torchvision.models.resnet50(pretrained=False).to(gpu)
ddp_model=DistributedDataParallel(model, device_ids=[local_rank])
ifargs.checkpointisnotNone:
map_location= {'cuda:%d'%0: 'cuda:%d'%local_rank}
ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
# distribute batch size (mini-batch)
batch_size=args.batch_size
batch_size_per_gpu=batch_size//size
# define loss function (criterion) and optimizer
criterion=nn.CrossEntropyLoss()
optimizer=torch.optim.SGD(ddp_model.parameters(), 1e-4)
transform_train=transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# load data with distributed sampler
#train_dataset = torchvision.datasets.CIFAR10(root='./data',
# train=True,
# transform=transform_train,
# download=False)
# load data with distributed sampler
train_dataset=torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,
num_replicas=size,
rank=rank)
train_loader=torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
# training (timers and display handled by process 0)
ifrank==0: start=datetime.now()
total_step=len(train_loader)
forepochinrange(args.epochs):
ifrank==0: start_dataload=time()
fori, (images, labels) inenumerate(train_loader):
# distribution of images and labels to all GPUs
images=images.to(gpu, non_blocking=True)
labels=labels.to(gpu, non_blocking=True)
ifrank==0: stop_dataload=time()
ifrank==0: start_training=time()
# forward pass
outputs=ddp_model(images)
loss=criterion(outputs, labels)
# backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
ifrank==0: stop_training=time()
if (i+1) %10==0andrank==0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch+1, args.epochs,
i+1, total_step, loss.item(), (stop_dataload-start_dataload)*1000,
(stop_training-start_training)*1000))
ifrank==0: start_dataload=time()
#Save checkpoint at every end of epoch
ifrank==0:
torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
ifrank==0:
print(">>> Training complete in: "+str(datetime.now() -start))
if__name__=='__main__':
main()代碼將數(shù)據(jù)和模型分割到多個(gè)gpu上,并以分布式的方式更新模型。
代碼解釋
train(args, ddp_arg)有兩個(gè)參數(shù),args和ddp_arg,其中args是傳遞給腳本的命令行參數(shù),ddp_arg包含分布式訓(xùn)練相關(guān)參數(shù)。
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式訓(xùn)練相關(guān)參數(shù)。
如果rank為0,則打印當(dāng)前使用的gpu數(shù)量和主節(jié)點(diǎn)IP地址信息。
dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式進(jìn)程組。
torch.cuda.set_device(local_rank):為這個(gè)進(jìn)程選擇指定的GPU。
model = torchvision.models. ResNet50 (pretrained=False).to(gpu):從torchvision模型中加載ResNet50模型,并將其移動(dòng)到指定的gpu。
ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):將模型包裝在DistributedDataParallel模塊中,也就是說這樣我們就可以進(jìn)行分布式訓(xùn)練了
加載CIFAR-10數(shù)據(jù)集并應(yīng)用數(shù)據(jù)增強(qiáng)轉(zhuǎn)換。
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):創(chuàng)建一個(gè)DistributedSampler對象,將數(shù)據(jù)集分割到多個(gè)gpu上。
train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):創(chuàng)建一個(gè)DataLoader對象,數(shù)據(jù)將批量加載到模型中,這與我們平常訓(xùn)練的步驟是一致的只不過是增加了一個(gè)分布式的數(shù)據(jù)采樣DistributedSampler
為指定的epoch數(shù)訓(xùn)練模型,以分布式的方式使用optimizer.step()更新權(quán)重。
rank0在每個(gè)輪次結(jié)束時(shí)保存一個(gè)檢查點(diǎn)。
rank0每10個(gè)批次顯示損失和訓(xùn)練時(shí)間。
結(jié)束訓(xùn)練時(shí)打印訓(xùn)練模型所花費(fèi)的總時(shí)間也是在rank0上。
代碼測試
在使用1個(gè)節(jié)點(diǎn)1/2/3/4個(gè)gpu, 2個(gè)節(jié)點(diǎn)6/8個(gè)gpu,每個(gè)節(jié)點(diǎn)3/4個(gè)gpu上進(jìn)行了訓(xùn)練Cifar10上的Resnet50的測試如下圖所示,每次測試的批處理大小保持不變。完成每項(xiàng)測試所花費(fèi)的時(shí)間以秒為單位記錄。隨著使用的gpu數(shù)量的增加,完成測試所需的時(shí)間會(huì)減少。當(dāng)使用8個(gè)gpu時(shí),需要320秒才能完成,這是記錄中最快的時(shí)間。這是肯定的,但是我們可以看到訓(xùn)練的速度并沒有像GPU數(shù)量增長呈現(xiàn)線性的增長,這可能是因?yàn)镽esnet50算是一個(gè)比較小的模型了,并不需要進(jìn)行并行化訓(xùn)練。

在多個(gè)gpu上使用數(shù)據(jù)并行可以顯著減少在給定數(shù)據(jù)集上訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)(DNN)所需的時(shí)間。隨著gpu數(shù)量的增加,完成訓(xùn)練過程所需的時(shí)間減少,這表明DNN可以更有效地并行訓(xùn)練。
這種方法在處理大型數(shù)據(jù)集或復(fù)雜的DNN架構(gòu)時(shí)特別有用。通過利用多個(gè)gpu,可以加快訓(xùn)練過程,實(shí)現(xiàn)更快的模型迭代和實(shí)驗(yàn)。但是需要注意的是,通過Data Parallelism實(shí)現(xiàn)的性能提升可能會(huì)受到通信開銷和GPU內(nèi)存限制等因素的限制,需要仔細(xì)調(diào)優(yōu)才能獲得最佳結(jié)果。
以上就是PyTorch并行訓(xùn)練DistributedDataParallel完整demo的詳細(xì)內(nèi)容,更多關(guān)于PyTorch并行訓(xùn)練的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情
這篇文章主要介紹了Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情,文章通過簡單輸出到cmd和文件中的代碼展開詳情,需要的朋友可以參考一下2022-07-07
python詞云庫wordCloud使用方法詳解(解決中文亂碼)
這篇文章主要介紹了python詞云庫wordCloud使用方法詳解(解決中文亂碼),需要的朋友可以參考下2020-02-02

