PyTorch?Distributed?Data?Parallel使用詳解
DDP
Distributed Data Parallel 簡稱 DDP
,是 PyTorch 框架下一種適用于單機多卡、多機多卡任務的數據并行方式。由于其良好的執(zhí)行效率及廣泛的顯卡支持,熟練掌握 DDP
已經成為深度學習從業(yè)者所必備的技能之一。本文結合具體代碼,詳細地說明了 DDP
在項目中的使用方式。讀者按照本文所給的范例,只需稍經調試,即可實現 DDP
的整套流程。
概念辨析
具體講解 DDP
之前,我們先了解了解它和 Data Parallel (DP
) 之間的區(qū)別。DP
同樣是 PyTorch 常見的多 GPU 并行方式之一,且它的實現非常簡潔:
# 函數定義 torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0) ''' module : 模型 device_ids : 參與訓練的 GPU 列表 output_device : 指定輸出的 GPU, 通常省略, 即默認使用索引為 0 的顯卡 ''' # 程序模板 device_ids = [0, 1] net = torch.nn.DataParallel(net, device_ids=device_ids)
基本原理及固有缺陷:在 Data Parallel 模式下,數據會被自動切分,加載到 GPU。同時,模型也將拷貝至各個 GPU 進行正向傳播。在多個進程之間,會有一個進程充當 master 節(jié)點,負責收集各張顯卡積累的梯度,并據此更新參數,再統(tǒng)一發(fā)送至其他顯卡。因此整體而言,master 節(jié)點承擔了更多的計算與通信任務,容易造成網絡堵塞,影響訓練速度。
常見問題及解決方案:Data Parallel 要求模型必須在 device_ids[0] 擁有參數及緩沖區(qū),因此當卡 0 被占用時,可以在 nn.DataParallel
之前添加如下代碼:
# 按照 PIC_BUS_ID 順序自 0 開始排列 GPU 設備 os.environ['CUDA_DEVICE_ORDER'] = 'PIC_BUS_ID' # 設置當前使用的 GPU 為 2、3 號設備 os.environ['CUDA_VISIBLE_DEVICES'] = '2, 3'
如此,device_ids[0] 將被默認為 2 號卡,device_ids[1] 則對應 3 號卡
相較于 DP
, Distributed Data Parallel 的實現要復雜得多,但是它的優(yōu)勢也非常明顯:
DDP
速度更快,可以達到略低于顯卡數量的加速比;DDP
可以實現負載的均勻分配,克服了DP
需要一個進程充當 master 節(jié)點的固有缺陷;- 采用
DDP
通??梢灾С指蟮?batch size,不會像DP
那樣出現其他顯卡尚有余力,而卡 0 直接 out of memory 的情況; - 另外,在
DDP
模式下,輸入到 data loader 的 bacth size 不再代表總數,而是每塊 GPU 各自負責的 sample 數量。比方說,batch_size = 30,有兩塊 GPU。在DP
模式下,每塊 GPU 會負責 15 個樣本。而在DDP
模式下,每塊 GPU 會各自負責 30 個樣本; DDP
基本原理:倘若我們擁有 N 張顯卡,則在 Distributed Data Parallel 模式下,就會啟動 N 個進程。每個進程在各自的卡上加載模型,且模型的參數完全相同。訓練過程中,各個進程通過一種名為 Ring-Reduce 的方式與其他進程通信,交換彼此的梯度,從而獲得所有的梯度信息。隨后,各個進程利用梯度的平均值更新參數。由于初始值和更新量完全相同,所以各個進程更新后的參數仍保持一致。
常用術語
- rank
- 進程號
- 多進程上下文中,通常假定 rank = 0 為主進程或第一個進程
- node
- 物理節(jié)點,表示一個容器或一臺機器
- 節(jié)點內部可以包含多個 GPU
- local_rank
- 一個 node 中,進程的相對序號
- local_rank 在 node 之間獨立
- world_size
- 全局進程數
- 一個分布式任務中 rank 的數量
- group
- 進程組
- 一個分布式任務就對應一個進程組
- 只有當用戶創(chuàng)立多個進程組時,才會用到
代碼實現
Distributed Data Parallel 可以通過 Python 的 torch.distributed.launch
啟動器,在命令行分布式地執(zhí)行 Python 文件。執(zhí)行過程中,啟動器會將當前進程(其實就是 GPU)的 index 通過參數傳遞給 Python,而我們可以利用如下方式獲取這個 index:
import argparse parser = argparse.ArgumentParser() parser.add_argument('--local_rank', default=-1, type=int, metavar='N', help='Local process rank.') args = parser.parse_args() # print(args.local_rank) # local_rank 表示本地進程序號
隨后,初始化進程組。對于在 GPU 執(zhí)行的任務,建議選擇 nccl
(由 NVIDIA 推出) 作為通信后端。對于在 CPU 執(zhí)行的任務,建議選擇 gloo
(由 Facebook 推出) 作為通信后端。倘若不傳入 init_method
,則默認為 env://
,表示自環(huán)境變量讀取分布式信息
dist.init_process_group(backend='nccl', init_method='env://') # 初始化進程組之后, 通常會執(zhí)行這兩行代碼 torch.cuda.set_device(args.local_rank) device = torch.device('cuda', args.local_rank) # 后續(xù)的 model = model.to(device), tensor.cuda(device) # 對應的都是這里由 args.local_rank 初始化得到的 device
數據部分,使用 Distributed Sampler 劃分數據集,并將 sampler 傳入 data loader。需要注意的是,此時在 data loader 中不能指定 shuffle 為 True,否則會報錯 (sampler 已具備隨機打亂功能)
dev_sampler = data.DistributedSampler(dev_data_set) train_sampler = data.DistributedSampler(train_data_set) dev_loader = data.DataLoader(dev_data_set, batch_size=dev_batch_size, shuffle=False, sampler=dev_sampler) train_loader = data.DataLoader(train_data_set, batch_size=train_batch_size, shuffle=False, sampler=train_sampler)
模型部分,首先將將模型送至 device,即對應的 GPU 上,再使用 Distributed Data Parallel 包裝模型(順序顛倒會報錯)
model = model.to(device) model = nn.parallel.DistributedDataParallel( model, device_ids=[args.local_rank], output_device=args.local_rank )
Distributed Data Parallel 模式下,保存模型應使用 net.module.state_dict()
,而非 net.state_dict()
。且無論是保存模型,還是 LOGGER 打印,只對 local_rank 為 0 的進程操作即可,因此代碼中會有很多 args.local_rank == 0
的判斷
if args.local_rank == 0: LOGGER.info(f'saving latest model: {output_path}') torch.save({'model': model.module.state_dict(), 'optimizer': None, 'epoch': epoch, 'best-f1': best_f1}, open(os.path.join(output_path, 'latest_model_{}.pth'.format(fold)), 'wb'))
利用 torch.load
加載模型時,設置 map_location=device
,否則卡 0 會承擔更多的開銷
load_model = torch.load(best_path, map_location=device) model.load_state_dict(load_model['model'])
dist.barrier()
可用于同步多個進程,建議只在必要的位置使用,如初始化DDP
模型之前、權重更新之后、開啟新一輪 epoch 之前- 計算 accuracy 時,可以使用
dist.all_reduce(score, op=dist.ReduceOp.SUM)
,將各個進程計算的準確率求平均 - 計算 f1-score 時,可以使用
dist.all_gather(all_prediction_list, prediction_list)
,將各個進程獲得的預測值和真實值匯總到 all_list,再統(tǒng)一代入公式
啟動方式
torch.distributed.launch
# 此處 --nproc_per_node 4 的含義是 server 有 4 張顯卡 python torch.distributed.launch --nproc_per_node 4 train.py # 倘若使用 nohup, 則注意輸入命令后 exit 當前終端 python torch.distributed.launch --nproc_per_node 4 train.py
torchrun
,推薦使用這種方式,因為torch.distributed.launch
即將棄用
代碼中,只需將 Argument Parser 相關的部分替換為
local_rank = int(os.environ['LOCAL_RANK'])
然后將 args.local_rank
全部改為 local_rank
即可
啟動命令
# 單機多卡訓練時, 可以不指定 nnodes torchrun --nnodes=1 --nproc_per_node=4 train.py # 倘若使用 nohup, 則注意輸入命令后 exit 當前終端 nohup torchrun --nnodes=1 --nproc_per_node=4 train.py > nohup.out &
以上就是PyTorch Distributed Data Parallel使用詳解的詳細內容,更多關于PyTorch Distributed Data Parallel的資料請關注腳本之家其它相關文章!
相關文章
pytest使用parametrize將參數化變量傳遞到fixture
這篇文章主要為大家介紹了pytest使用parametrize將參數化變量傳遞到fixture的使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05