PyTorch并行訓練DistributedDataParallel完整demo
大型數(shù)據(jù)集訓練
使用大型數(shù)據(jù)集訓練大型深度神經(jīng)網(wǎng)絡(luò) (DNN) 的問題是深度學習領(lǐng)域的主要挑戰(zhàn)。 隨著 DNN 和數(shù)據(jù)集規(guī)模的增加,訓練這些模型的計算和內(nèi)存需求也會增加。 這使得在計算資源有限的單臺機器上訓練這些模型變得困難甚至不可能。
使用大型數(shù)據(jù)集訓練大型 DNN 的一些主要挑戰(zhàn)包括:
- 訓練時間長:訓練過程可能需要數(shù)周甚至數(shù)月才能完成,具體取決于模型的復雜性和數(shù)據(jù)集的大小。
- 內(nèi)存限制:大型 DNN 可能需要大量內(nèi)存來存儲訓練期間的所有模型參數(shù)、梯度和中間激活。 這可能會導致內(nèi)存不足錯誤并限制可在單臺機器上訓練的模型的大小。
為了應(yīng)對這些挑戰(zhàn),已經(jīng)開發(fā)了各種技術(shù)來擴大具有大型數(shù)據(jù)集的大型 DNN 的訓練,包括模型并行性、數(shù)據(jù)并行性和混合并行性,以及硬件、軟件和算法的優(yōu)化。
PyTorch 的數(shù)據(jù)并行性和模型并行性
在本文中我們將演示使用 PyTorch 的數(shù)據(jù)并行性和模型并行性。

我們所說的并行性一般是指在多個gpu,或多臺機器上訓練深度神經(jīng)網(wǎng)絡(luò)(dnn),以實現(xiàn)更少的訓練時間。數(shù)據(jù)并行背后的基本思想是將訓練數(shù)據(jù)分成更小的塊,讓每個GPU或機器處理一個單獨的數(shù)據(jù)塊。然后將每個節(jié)點的結(jié)果組合起來,用于更新模型參數(shù)。在數(shù)據(jù)并行中,模型體系結(jié)構(gòu)在每個節(jié)點上是相同的,但模型參數(shù)在節(jié)點之間進行了分區(qū)。每個節(jié)點使用分配的數(shù)據(jù)塊訓練自己的本地模型,在每次訓練迭代結(jié)束時,模型參數(shù)在所有節(jié)點之間同步。這個過程不斷重復,直到模型收斂到一個令人滿意的結(jié)果。
下面我們用用ResNet50和CIFAR10數(shù)據(jù)集來進行完整的代碼示例:
在數(shù)據(jù)并行中,模型架構(gòu)在每個節(jié)點上保持相同,但模型參數(shù)在節(jié)點之間進行了分區(qū),每個節(jié)點使用分配的數(shù)據(jù)塊訓練自己的本地模型。
PyTorch的DistributedDataParallel 庫可以進行跨節(jié)點的梯度和模型參數(shù)的高效通信和同步,實現(xiàn)分布式訓練。本文提供了如何使用ResNet50和CIFAR10數(shù)據(jù)集使用PyTorch實現(xiàn)數(shù)據(jù)并行的示例,其中代碼在多個gpu或機器上運行,每臺機器處理訓練數(shù)據(jù)的一個子集。訓練過程使用PyTorch的DistributedDataParallel 庫進行并行化。
導入必須要的庫
importos fromdatetimeimportdatetime fromtimeimporttime importargparse importtorchvision importtorchvision.transformsastransforms importtorch importtorch.nnasnn importtorch.distributedasdist fromtorch.nn.parallelimportDistributedDataParallel
檢查GPU
importsubprocess result=subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE) print(result.stdout.decode())
因為我們需要在多個服務(wù)器上運行,所以手動一個一個執(zhí)行并不現(xiàn)實,所以需要有一個調(diào)度程序。這里我們使用SLURM文件來運行代碼(slurm面向Linux和Unix類似內(nèi)核的免費和開源工作調(diào)度程序),
defmain():
# get distributed configuration from Slurm environment
parser=argparse.ArgumentParser()
parser.add_argument('-b', '--batch-size', default=128, type=int,
help='batch size. it will be divided in mini-batch for each worker')
parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('-c','--checkpoint', default=None, type=str,
help='path to checkpoint to load')
args=parser.parse_args()
rank=int(os.environ['SLURM_PROCID'])
local_rank=int(os.environ['SLURM_LOCALID'])
size=int(os.environ['SLURM_NTASKS'])
master_addr=os.environ["SLURM_SRUN_COMM_HOST"]
port="29500"
node_id=os.environ['SLURM_NODEID']
ddp_arg= [rank, local_rank, size, master_addr, port, node_id]
train(args, ddp_arg)然后我們使用DistributedDataParallel 庫來執(zhí)行分布式訓練。
deftrain(args, ddp_arg):
rank, local_rank, size, MASTER_ADDR, port, NODE_ID=ddp_arg
# display info
ifrank==0:
#print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
#print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
# configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
#dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
dist.init_process_group(
backend='nccl',
init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
world_size=size,
rank=rank
)
# distribute model
torch.cuda.set_device(local_rank)
gpu=torch.device("cuda")
#model = ResNet18(classes=10).to(gpu)
model=torchvision.models.resnet50(pretrained=False).to(gpu)
ddp_model=DistributedDataParallel(model, device_ids=[local_rank])
ifargs.checkpointisnotNone:
map_location= {'cuda:%d'%0: 'cuda:%d'%local_rank}
ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
# distribute batch size (mini-batch)
batch_size=args.batch_size
batch_size_per_gpu=batch_size//size
# define loss function (criterion) and optimizer
criterion=nn.CrossEntropyLoss()
optimizer=torch.optim.SGD(ddp_model.parameters(), 1e-4)
transform_train=transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# load data with distributed sampler
#train_dataset = torchvision.datasets.CIFAR10(root='./data',
# train=True,
# transform=transform_train,
# download=False)
# load data with distributed sampler
train_dataset=torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,
num_replicas=size,
rank=rank)
train_loader=torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
# training (timers and display handled by process 0)
ifrank==0: start=datetime.now()
total_step=len(train_loader)
forepochinrange(args.epochs):
ifrank==0: start_dataload=time()
fori, (images, labels) inenumerate(train_loader):
# distribution of images and labels to all GPUs
images=images.to(gpu, non_blocking=True)
labels=labels.to(gpu, non_blocking=True)
ifrank==0: stop_dataload=time()
ifrank==0: start_training=time()
# forward pass
outputs=ddp_model(images)
loss=criterion(outputs, labels)
# backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
ifrank==0: stop_training=time()
if (i+1) %10==0andrank==0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch+1, args.epochs,
i+1, total_step, loss.item(), (stop_dataload-start_dataload)*1000,
(stop_training-start_training)*1000))
ifrank==0: start_dataload=time()
#Save checkpoint at every end of epoch
ifrank==0:
torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
ifrank==0:
print(">>> Training complete in: "+str(datetime.now() -start))
if__name__=='__main__':
main()代碼將數(shù)據(jù)和模型分割到多個gpu上,并以分布式的方式更新模型。
代碼解釋
train(args, ddp_arg)有兩個參數(shù),args和ddp_arg,其中args是傳遞給腳本的命令行參數(shù),ddp_arg包含分布式訓練相關(guān)參數(shù)。
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式訓練相關(guān)參數(shù)。
如果rank為0,則打印當前使用的gpu數(shù)量和主節(jié)點IP地址信息。
dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式進程組。
torch.cuda.set_device(local_rank):為這個進程選擇指定的GPU。
model = torchvision.models. ResNet50 (pretrained=False).to(gpu):從torchvision模型中加載ResNet50模型,并將其移動到指定的gpu。
ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):將模型包裝在DistributedDataParallel模塊中,也就是說這樣我們就可以進行分布式訓練了
加載CIFAR-10數(shù)據(jù)集并應(yīng)用數(shù)據(jù)增強轉(zhuǎn)換。
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):創(chuàng)建一個DistributedSampler對象,將數(shù)據(jù)集分割到多個gpu上。
train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):創(chuàng)建一個DataLoader對象,數(shù)據(jù)將批量加載到模型中,這與我們平常訓練的步驟是一致的只不過是增加了一個分布式的數(shù)據(jù)采樣DistributedSampler
為指定的epoch數(shù)訓練模型,以分布式的方式使用optimizer.step()更新權(quán)重。
rank0在每個輪次結(jié)束時保存一個檢查點。
rank0每10個批次顯示損失和訓練時間。
結(jié)束訓練時打印訓練模型所花費的總時間也是在rank0上。
代碼測試
在使用1個節(jié)點1/2/3/4個gpu, 2個節(jié)點6/8個gpu,每個節(jié)點3/4個gpu上進行了訓練Cifar10上的Resnet50的測試如下圖所示,每次測試的批處理大小保持不變。完成每項測試所花費的時間以秒為單位記錄。隨著使用的gpu數(shù)量的增加,完成測試所需的時間會減少。當使用8個gpu時,需要320秒才能完成,這是記錄中最快的時間。這是肯定的,但是我們可以看到訓練的速度并沒有像GPU數(shù)量增長呈現(xiàn)線性的增長,這可能是因為Resnet50算是一個比較小的模型了,并不需要進行并行化訓練。

在多個gpu上使用數(shù)據(jù)并行可以顯著減少在給定數(shù)據(jù)集上訓練深度神經(jīng)網(wǎng)絡(luò)(DNN)所需的時間。隨著gpu數(shù)量的增加,完成訓練過程所需的時間減少,這表明DNN可以更有效地并行訓練。
這種方法在處理大型數(shù)據(jù)集或復雜的DNN架構(gòu)時特別有用。通過利用多個gpu,可以加快訓練過程,實現(xiàn)更快的模型迭代和實驗。但是需要注意的是,通過Data Parallelism實現(xiàn)的性能提升可能會受到通信開銷和GPU內(nèi)存限制等因素的限制,需要仔細調(diào)優(yōu)才能獲得最佳結(jié)果。
以上就是PyTorch并行訓練DistributedDataParallel完整demo的詳細內(nèi)容,更多關(guān)于PyTorch并行訓練的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python詞云庫wordCloud使用方法詳解(解決中文亂碼)
這篇文章主要介紹了python詞云庫wordCloud使用方法詳解(解決中文亂碼),需要的朋友可以參考下2020-02-02

