pytorch?rpc實(shí)現(xiàn)分物理機(jī)器實(shí)現(xiàn)model?parallel的過程詳解
因?yàn)闃I(yè)務(wù)需要,最近接到一項(xiàng)任務(wù),是如何利用pytorch實(shí)現(xiàn)model parallel以及distributed training。搜羅了網(wǎng)上很多資料,以及閱讀了pytorch官方的教程,都沒有可參考的案例。講的比較多的是data parallel,關(guān)于model parallel的研究發(fā)現(xiàn)不多。
通過閱讀pytorch官方主頁,發(fā)現(xiàn)這個(gè)example是進(jìn)行model parallel的,
官方博客地址:DISTRIBUTED PIPELINE PARALLELISM USING RPC
官方的example地址:Distributed Pipeline Parallel Example
通過閱讀代碼發(fā)現(xiàn),這個(gè)代碼以Resnet 50 model為例,將model直接拆分成兩部分,并指定兩部分在不同的worker運(yùn)行,代碼實(shí)現(xiàn)了在同一臺(tái)機(jī)器上,創(chuàng)建多進(jìn)程來拆分模型運(yùn)行。關(guān)于這個(gè)代碼的詳細(xì)介紹可搜索關(guān)鍵詞:pytorch RPC 的分布式管道并行,這里不多介紹。
通過在本地運(yùn)行代碼發(fā)現(xiàn),不滿足多機(jī)器運(yùn)行的需求。接下來是思考的心路里程。
1.首先通過代碼發(fā)現(xiàn),python main.py程序運(yùn)行時(shí),無法指定rank,那么在跨機(jī)器運(yùn)行時(shí)如何知道哪臺(tái)機(jī)器是worker1,worker2?這個(gè)地方,我們首先懷疑需要去修改worker,人為在代碼中指定worker的IP地址,如修改main.py 代碼中191行
修改前:model = DistResNet50(split_size, ["worker1", "worker2"])
修改后:model = DistResNet50(split_size, ["worker1@xxx.xxx.xxx.xxx", "worker2@xxx.xxx.xxx.xxx"])
然后,很自然的就報(bào)錯(cuò)了,這里無法識(shí)別這樣的worker名,不支持直接指定,這條路也就走不通了。
2.接著只能重新閱讀代碼,到最后251行,我們發(fā)現(xiàn)mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True)
尤其是這行代碼中mp.spawn引起了我們的懷疑,這不是多進(jìn)程么,這本質(zhì)還是在多進(jìn)程情況下來執(zhí)行程序,無法跨機(jī)器啊,不符合我們的需求。
3.最后的最后,我們重新閱讀pytorch rpc機(jī)制,并通過簡(jiǎn)單測(cè)試程序,讓兩臺(tái)機(jī)器互相通信,其中一臺(tái)機(jī)器發(fā)起運(yùn)算請(qǐng)求并傳輸原始數(shù)據(jù),另外一臺(tái)機(jī)器負(fù)責(zé)接收數(shù)據(jù)并進(jìn)行相關(guān)運(yùn)算,這個(gè)程序當(dāng)時(shí)在兩臺(tái)物理機(jī)器上測(cè)試成功了,那說明rpc實(shí)現(xiàn)通信這件事并不復(fù)雜。結(jié)合前面給的代碼,我們決定將worke1和worker2分開寫代碼,分開執(zhí)行,并且在代碼中需要指定這些worker所屬的rank,這樣理論上就能夠?qū)⒃即a修改成分機(jī)器的rpc通信運(yùn)行了。
上面主要是我們的心理歷程,話不多說,接下來show the code。
實(shí)驗(yàn)環(huán)境,兩臺(tái)機(jī)器,均是cpu環(huán)境,conda安裝的環(huán)境也保證了一致。
master機(jī)器代碼:
# https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.py
import os
import threading
import time
import torch
import torch.nn as nn
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.optim import DistributedOptimizer
from torch.distributed.rpc import RRef
from torchvision.models.resnet import Bottleneck
os.environ['MASTER_ADDR'] = 'XXX.XXX.XXX.XXX' # 指定master ip地址
os.environ['MASTER_PORT'] = '7856' # 指定master 端口號(hào)
#########################################################
# Define Model Parallel ResNet50 #
#########################################################
# In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.
num_classes = 1000
def conv1x1(in_planes, out_planes, stride=1):
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class ResNetBase(nn.Module):
def __init__(self, block, inplanes, num_classes=1000,
groups=1, width_per_group=64, norm_layer=None):
super(ResNetBase, self).__init__()
self._lock = threading.Lock()
self._block = block
self._norm_layer = nn.BatchNorm2d
self.inplanes = inplanes
self.dilation = 1
self.groups = groups
self.base_width = width_per_group
def _make_layer(self, planes, blocks, stride=1):
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if stride != 1 or self.inplanes != planes * self._block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * self._block.expansion, stride),
norm_layer(planes * self._block.expansion),
)
layers = []
layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,
self.base_width, previous_dilation, norm_layer))
self.inplanes = planes * self._block.expansion
for _ in range(1, blocks):
layers.append(self._block(self.inplanes, planes, groups=self.groups,
base_width=self.base_width, dilation=self.dilation,
norm_layer=norm_layer))
return nn.Sequential(*layers)
def parameter_rrefs(self):
r"""
Create one RRef for each parameter in the given local module, and return a
list of RRefs.
"""
return [RRef(p) for p in self.parameters()]
class ResNetShard1(ResNetBase):
"""
The first part of ResNet.
"""
def __init__(self, device, *args, **kwargs):
super(ResNetShard1, self).__init__(
Bottleneck, 64, num_classes=num_classes, *args, **kwargs)
self.device = device
self.seq = nn.Sequential(
nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False),
self._norm_layer(self.inplanes),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
self._make_layer(64, 3),
self._make_layer(128, 4, stride=2)
).to(self.device)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
def forward(self, x_rref):
x = x_rref.to_here().to(self.device)
with self._lock:
out = self.seq(x)
return out.cpu()
class ResNetShard2(ResNetBase):
"""
The second part of ResNet.
"""
def __init__(self, device, *args, **kwargs):
super(ResNetShard2, self).__init__(
Bottleneck, 512, num_classes=num_classes, *args, **kwargs)
self.device = device
self.seq = nn.Sequential(
self._make_layer(256, 6, stride=2),
self._make_layer(512, 3, stride=2),
nn.AdaptiveAvgPool2d((1, 1)),
).to(self.device)
self.fc = nn.Linear(512 * self._block.expansion, num_classes).to(self.device)
def forward(self, x_rref):
x = x_rref.to_here().to(self.device)
with self._lock:
out = self.fc(torch.flatten(self.seq(x), 1))
return out.cpu()
class DistResNet50(nn.Module):
"""
Assemble two parts as an nn.Module and define pipelining logic
"""
def __init__(self, split_size, workers, *args, **kwargs):
super(DistResNet50, self).__init__()
self.split_size = split_size
# Put the first part of the ResNet50 on workers[0]
self.p1_rref = rpc.remote(
workers[0],
ResNetShard1,
args = ("cuda:0",) + args,
kwargs = kwargs
)
# Put the second part of the ResNet50 on workers[1]
self.p2_rref = rpc.remote(
workers[1],
ResNetShard2,
args = ("cpu",) + args,
kwargs = kwargs
)
def forward(self, xs):
# Split the input batch xs into micro-batches, and collect async RPC
# futures into a list
out_futures = []
for x in iter(xs.split(self.split_size, dim=0)):
x_rref = RRef(x)
y_rref = self.p1_rref.remote().forward(x_rref)
print(y_rref)
z_fut = self.p2_rref.rpc_async().forward(y_rref)
print(z_fut)
out_futures.append(z_fut)
# collect and cat all output tensors into one tensor.
return torch.cat(torch.futures.wait_all(out_futures))
def parameter_rrefs(self):
remote_params = []
remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
return remote_params
#########################################################
# Run RPC Processes #
#########################################################
num_batches = 3
batch_size = 8
image_w = 128
image_h = 128
if __name__=="__main__":
options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300)
# 初始化主節(jié)點(diǎn)的RPC連接
rpc.init_rpc("master", rank=0, world_size=2, rpc_backend_options=options)
for num_split in [1,2]:
tik = time.time()
model = DistResNet50(num_split, ["master", "worker"])
loss_fn = nn.MSELoss()
opt = DistributedOptimizer(
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
)
one_hot_indices = torch.LongTensor(batch_size) \
.random_(0, num_classes) \
.view(batch_size, 1)
for i in range(num_batches):
print(f"Processing batch {i}")
# generate random inputs and labels
inputs = torch.randn(batch_size, 3, image_w, image_h)
labels = torch.zeros(batch_size, num_classes) \
.scatter_(1, one_hot_indices, 1)
with dist_autograd.context() as context_id:
outputs = model(inputs)
dist_autograd.backward(context_id, [loss_fn(outputs, labels)])
opt.step(context_id)
tok = time.time()
print(f"number of splits = {num_split}, execution time = {tok - tik}")
# 關(guān)閉RPC連接
rpc.shutdown()worker端的代碼
# https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.py
import os
import threading
import time
from functools import wraps
import torch
import torch.nn as nn
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
from torchvision.models.resnet import Bottleneck
os.environ['MASTER_ADDR'] = 'XXX.XXX.XXX.XXX' # 指定master 端口號(hào)
os.environ['MASTER_PORT'] = '7856' # 指定master 端口號(hào)
#########################################################
# Define Model Parallel ResNet50 #
#########################################################
# In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.
num_classes = 1000
def conv1x1(in_planes, out_planes, stride=1):
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class ResNetBase(nn.Module):
def __init__(self, block, inplanes, num_classes=1000,
groups=1, width_per_group=64, norm_layer=None):
super(ResNetBase, self).__init__()
self._lock = threading.Lock()
self._block = block
self._norm_layer = nn.BatchNorm2d
self.inplanes = inplanes
self.dilation = 1
self.groups = groups
self.base_width = width_per_group
def _make_layer(self, planes, blocks, stride=1):
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if stride != 1 or self.inplanes != planes * self._block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * self._block.expansion, stride),
norm_layer(planes * self._block.expansion),
)
layers = []
layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,
self.base_width, previous_dilation, norm_layer))
self.inplanes = planes * self._block.expansion
for _ in range(1, blocks):
layers.append(self._block(self.inplanes, planes, groups=self.groups,
base_width=self.base_width, dilation=self.dilation,
norm_layer=norm_layer))
return nn.Sequential(*layers)
def parameter_rrefs(self):
r"""
Create one RRef for each parameter in the given local module, and return a
list of RRefs.
"""
return [RRef(p) for p in self.parameters()]
class ResNetShard2(ResNetBase):
"""
The second part of ResNet.
"""
def __init__(self, device, *args, **kwargs):
super(ResNetShard2, self).__init__(
Bottleneck, 512, num_classes=num_classes, *args, **kwargs)
self.device = device
self.seq = nn.Sequential(
self._make_layer(256, 6, stride=2),
self._make_layer(512, 3, stride=2),
nn.AdaptiveAvgPool2d((1, 1)),
).to(self.device)
self.fc = nn.Linear(512 * self._block.expansion, num_classes).to(self.device)
def forward(self, x_rref):
x = x_rref.to_here().to(self.device)
print(x)
with self._lock:
out = self.fc(torch.flatten(self.seq(x), 1))
return out.cpu()
#########################################################
# Run RPC Processes #
#########################################################
if __name__=="__main__":
options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300)
# 初始化工作節(jié)點(diǎn)的RPC連接
rpc.init_rpc("worker", rank=1, world_size=2, rpc_backend_options=options)
# 等待主節(jié)點(diǎn)的調(diào)用
rpc.shutdown()代碼中的MASTER_ADDR和port需要指定為一致,分別在master機(jī)器上運(yùn)行master.py,worker機(jī)器上運(yùn)行worker.py,這樣就可以實(shí)現(xiàn)Resnet 50 model在兩臺(tái)物理機(jī)器上model parallel。
注意事項(xiàng)
- 確保物理機(jī)器能夠互相ping通,同時(shí)關(guān)閉防火墻
- 兩個(gè)物理機(jī)器最好都是linux環(huán)境,我們的實(shí)驗(yàn)發(fā)現(xiàn)pytorch的分布式不支持在Windows環(huán)境運(yùn)行
- 兩個(gè)物理機(jī)器的python運(yùn)行環(huán)境要求保持一致
到此這篇關(guān)于pytorch rpc如何實(shí)現(xiàn)分物理機(jī)器實(shí)現(xiàn)model parallel的文章就介紹到這了,更多相關(guān)pytorch rpc實(shí)現(xiàn)model parallel內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python中itertools庫的四個(gè)函數(shù)介紹
這篇文章主要介紹了Python中itertools庫的四個(gè)函數(shù),主要討論itertools庫中的十分使用的幾個(gè)函數(shù),并重點(diǎn)介紹什么時(shí)候我們應(yīng)該考慮使用它們,需要的朋友可以參考一下2022-04-04
Python2與Python3的區(qū)別實(shí)例總結(jié)
這篇文章主要介紹了Python2與Python3的區(qū)別,結(jié)合實(shí)例形式總結(jié)分析了Python2與Python3打印輸出、編碼、數(shù)值運(yùn)算、異常處理等使用區(qū)別,需要的朋友可以參考下2019-04-04
Python3中的map函數(shù)調(diào)用后內(nèi)存釋放問題
這篇文章主要介紹了Python3中的map函數(shù)調(diào)用后內(nèi)存釋放問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-02-02
Python連接數(shù)據(jù)庫并批量插入包含日期記錄的操作
這篇文章主要介紹了Python連接數(shù)據(jù)庫并批量插入包含日期記錄的操作,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-06-06
Pycharm 創(chuàng)建 Django admin 用戶名和密碼的實(shí)例
今天小編就為大家分享一篇Pycharm 創(chuàng)建 Django admin 用戶名和密碼的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-05-05
Python字符串的encode與decode研究心得亂碼問題解決方法
為什么Python使用過程中會(huì)出現(xiàn)各式各樣的亂碼問題,明明是中文字符卻顯示成“\xe4\xb8\xad\xe6\x96\x87”的形式?2009-03-03

