当前位置: 首页 > news >正文

宿豫区建设局网站涿州网站建设

宿豫区建设局网站,涿州网站建设,成都网站建设987net,四平专业网站设计因为业务需要,最近接到一项任务,是如何利用pytorch实现model parallel以及distributed training。搜罗了网上很多资料,以及阅读了pytorch官方的教程,都没有可参考的案例。讲的比较多的是data parallel,关于model paral…

因为业务需要,最近接到一项任务,是如何利用pytorch实现model parallel以及distributed training。搜罗了网上很多资料,以及阅读了pytorch官方的教程,都没有可参考的案例。讲的比较多的是data parallel,关于model parallel的研究发现不多。
通过阅读pytorch官方主页,发现这个example是进行model parallel的,
官方博客地址:DISTRIBUTED PIPELINE PARALLELISM USING RPC
官方的example地址:Distributed Pipeline Parallel Example
通过阅读代码发现,这个代码以Resnet 50 model为例,将model直接拆分成两部分,并指定两部分在不同的worker运行,代码实现了在同一台机器上,创建多进程来拆分模型运行。关于这个代码的详细介绍可搜索关键词:pytorch RPC 的分布式管道并行,这里不多介绍。
通过在本地运行代码发现,不满足多机器运行的需求。接下来是思考的心路里程。

  1. 首先通过代码发现,python main.py程序运行时,无法指定rank,那么在跨机器运行时如何知道哪台机器是worker1,worker2?这个地方,我们首先怀疑需要去修改worker,人为在代码中指定worker的IP地址,如修改main.py 代码中191行
    修改前:
    model = DistResNet50(split_size, ["worker1", "worker2"])
    修改后:
    model = DistResNet50(split_size, ["worker1@xxx.xxx.xxx.xxx", "worker2@xxx.xxx.xxx.xxx"])
    然后,很自然的就报错了,这里无法识别这样的worker名,不支持直接指定,这条路也就走不通了。
  2. 接着只能重新阅读代码,到最后251行,我们发现
    mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True)
    尤其是这行代码中mp.spawn引起了我们的怀疑,这不是多进程么,这本质还是在多进程情况下来执行程序,无法跨机器啊,不符合我们的需求。
  3. 最后的最后,我们重新阅读pytorch rpc机制,并通过简单测试程序,让两台机器互相通信,其中一台机器发起运算请求并传输原始数据,另外一台机器负责接收数据并进行相关运算,这个程序当时在两台物理机器上测试成功了,那说明rpc实现通信这件事并不复杂。结合前面给的代码,我们决定将worke1和worker2分开写代码,分开执行,并且在代码中需要指定这些worker所属的rank,这样理论上就能够将原始代码修改成分机器的rpc通信运行了。

上面主要是我们的心理历程,话不多说,接下来show the code。
实验环境,两台机器,均是cpu环境,conda安装的环境也保证了一致。
master机器代码:


# https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.pyimport os
import threading
import time
import torch
import torch.nn as nn
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.optim import DistributedOptimizer
from torch.distributed.rpc import RReffrom torchvision.models.resnet import Bottleneck
os.environ['MASTER_ADDR'] = 'XXX.XXX.XXX.XXX' # 指定master ip地址
os.environ['MASTER_PORT'] = '7856' # 指定master 端口号#########################################################
#           Define Model Parallel ResNet50              #
########################################################## In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.num_classes = 1000def conv1x1(in_planes, out_planes, stride=1):"""1x1 convolution"""return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)class ResNetBase(nn.Module):def __init__(self, block, inplanes, num_classes=1000,groups=1, width_per_group=64, norm_layer=None):super(ResNetBase, self).__init__()self._lock = threading.Lock()self._block = blockself._norm_layer = nn.BatchNorm2dself.inplanes = inplanesself.dilation = 1self.groups = groupsself.base_width = width_per_groupdef _make_layer(self, planes, blocks, stride=1):norm_layer = self._norm_layerdownsample = Noneprevious_dilation = self.dilationif stride != 1 or self.inplanes != planes * self._block.expansion:downsample = nn.Sequential(conv1x1(self.inplanes, planes * self._block.expansion, stride),norm_layer(planes * self._block.expansion),)layers = []layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,self.base_width, previous_dilation, norm_layer))self.inplanes = planes * self._block.expansionfor _ in range(1, blocks):layers.append(self._block(self.inplanes, planes, groups=self.groups,base_width=self.base_width, dilation=self.dilation,norm_layer=norm_layer))return nn.Sequential(*layers)def parameter_rrefs(self):r"""Create one RRef for each parameter in the given local module, and return alist of RRefs."""return [RRef(p) for p in self.parameters()]class ResNetShard1(ResNetBase):"""The first part of ResNet."""def __init__(self, device, *args, **kwargs):super(ResNetShard1, self).__init__(Bottleneck, 64, num_classes=num_classes, *args, **kwargs)self.device = deviceself.seq = nn.Sequential(nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False),self._norm_layer(self.inplanes),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=3, stride=2, padding=1),self._make_layer(64, 3),self._make_layer(128, 4, stride=2)).to(self.device)for m in self.modules():if isinstance(m, nn.Conv2d):nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')elif isinstance(m, nn.BatchNorm2d):nn.init.ones_(m.weight)nn.init.zeros_(m.bias)def forward(self, x_rref):x = x_rref.to_here().to(self.device)with self._lock:out =  self.seq(x)return out.cpu()class ResNetShard2(ResNetBase):"""The second part of ResNet."""def __init__(self, device, *args, **kwargs):super(ResNetShard2, self).__init__(Bottleneck, 512, num_classes=num_classes, *args, **kwargs)self.device = deviceself.seq = nn.Sequential(self._make_layer(256, 6, stride=2),self._make_layer(512, 3, stride=2),nn.AdaptiveAvgPool2d((1, 1)),).to(self.device)self.fc =  nn.Linear(512 * self._block.expansion, num_classes).to(self.device)def forward(self, x_rref):x = x_rref.to_here().to(self.device)with self._lock:out = self.fc(torch.flatten(self.seq(x), 1))return out.cpu()class DistResNet50(nn.Module):"""Assemble two parts as an nn.Module and define pipelining logic"""def __init__(self, split_size, workers, *args, **kwargs):super(DistResNet50, self).__init__()self.split_size = split_size# Put the first part of the ResNet50 on workers[0]self.p1_rref = rpc.remote(workers[0],ResNetShard1,args = ("cuda:0",) + args,kwargs = kwargs)# Put the second part of the ResNet50 on workers[1]self.p2_rref = rpc.remote(workers[1],ResNetShard2,args = ("cpu",) + args,kwargs = kwargs)def forward(self, xs):# Split the input batch xs into micro-batches, and collect async RPC# futures into a listout_futures = []for x in iter(xs.split(self.split_size, dim=0)):x_rref = RRef(x)y_rref = self.p1_rref.remote().forward(x_rref)print(y_rref)z_fut = self.p2_rref.rpc_async().forward(y_rref)print(z_fut)out_futures.append(z_fut)# collect and cat all output tensors into one tensor.return torch.cat(torch.futures.wait_all(out_futures))def parameter_rrefs(self):remote_params = []remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())return remote_params#########################################################
#                   Run RPC Processes                   #
#########################################################num_batches = 3
batch_size = 8
image_w = 128
image_h = 128if __name__=="__main__":options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300)# 初始化主节点的RPC连接rpc.init_rpc("master", rank=0, world_size=2, rpc_backend_options=options)for num_split in [1,2]:tik = time.time()model = DistResNet50(num_split, ["master", "worker"])loss_fn = nn.MSELoss()opt = DistributedOptimizer(optim.SGD,model.parameter_rrefs(),lr=0.05,)one_hot_indices = torch.LongTensor(batch_size) \.random_(0, num_classes) \.view(batch_size, 1)for i in range(num_batches):print(f"Processing batch {i}")# generate random inputs and labelsinputs = torch.randn(batch_size, 3, image_w, image_h)labels = torch.zeros(batch_size, num_classes) \.scatter_(1, one_hot_indices, 1)with dist_autograd.context() as context_id:outputs = model(inputs)dist_autograd.backward(context_id, [loss_fn(outputs, labels)])opt.step(context_id)tok = time.time()print(f"number of splits = {num_split}, execution time = {tok - tik}")# 关闭RPC连接rpc.shutdown()

worker端的代码


# https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.pyimport os
import threading
import time
from functools import wrapsimport torch
import torch.nn as nn
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RReffrom torchvision.models.resnet import Bottleneck
os.environ['MASTER_ADDR'] = 'XXX.XXX.XXX.XXX' # 指定master 端口号
os.environ['MASTER_PORT'] = '7856' # 指定master 端口号#########################################################
#           Define Model Parallel ResNet50              #
########################################################## In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.num_classes = 1000def conv1x1(in_planes, out_planes, stride=1):"""1x1 convolution"""return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)class ResNetBase(nn.Module):def __init__(self, block, inplanes, num_classes=1000,groups=1, width_per_group=64, norm_layer=None):super(ResNetBase, self).__init__()self._lock = threading.Lock()self._block = blockself._norm_layer = nn.BatchNorm2dself.inplanes = inplanesself.dilation = 1self.groups = groupsself.base_width = width_per_groupdef _make_layer(self, planes, blocks, stride=1):norm_layer = self._norm_layerdownsample = Noneprevious_dilation = self.dilationif stride != 1 or self.inplanes != planes * self._block.expansion:downsample = nn.Sequential(conv1x1(self.inplanes, planes * self._block.expansion, stride),norm_layer(planes * self._block.expansion),)layers = []layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,self.base_width, previous_dilation, norm_layer))self.inplanes = planes * self._block.expansionfor _ in range(1, blocks):layers.append(self._block(self.inplanes, planes, groups=self.groups,base_width=self.base_width, dilation=self.dilation,norm_layer=norm_layer))return nn.Sequential(*layers)def parameter_rrefs(self):r"""Create one RRef for each parameter in the given local module, and return alist of RRefs."""return [RRef(p) for p in self.parameters()]class ResNetShard2(ResNetBase):"""The second part of ResNet."""def __init__(self, device, *args, **kwargs):super(ResNetShard2, self).__init__(Bottleneck, 512, num_classes=num_classes, *args, **kwargs)self.device = deviceself.seq = nn.Sequential(self._make_layer(256, 6, stride=2),self._make_layer(512, 3, stride=2),nn.AdaptiveAvgPool2d((1, 1)),).to(self.device)self.fc =  nn.Linear(512 * self._block.expansion, num_classes).to(self.device)def forward(self, x_rref):x = x_rref.to_here().to(self.device)print(x)with self._lock:out = self.fc(torch.flatten(self.seq(x), 1))return out.cpu()#########################################################
#                   Run RPC Processes                   #
#########################################################if __name__=="__main__":options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300)# 初始化工作节点的RPC连接rpc.init_rpc("worker", rank=1, world_size=2, rpc_backend_options=options)# 等待主节点的调用rpc.shutdown()

代码中的MASTER_ADDR和port需要指定为一致,分别在master机器上运行master.py,worker机器上运行worker.py,这样就可以实现Resnet 50 model在两台物理机器上model parallel。
注意事项

  • 确保物理机器能够互相ping通,同时关闭防火墙
  • 两个物理机器最好都是linux环境,我们的实验发现pytorch的分布式不支持在Windows环境运行
  • 两个物理机器的python运行环境要求保持一致
http://www.yayakq.cn/news/8725/

相关文章:

  • 公司网站建设图片素材怎么找保山市网站建设
  • 闵行区网站网站icon怎么做的
  • 360做网站荆州济南高新区 网站制作
  • 社交网站模板下载木兰姐网站建设
  • 中山网站建设文化策划wordpress登陆不进去
  • 食品网站建设规划东莞保安公司招聘电话
  • 旅游网站功能简介互联网创业项目创意
  • 如何做外贸网站购买天猫店铺网站
  • 宝山专业做网站做我的世界壁纸的网站
  • 自己网站怎么建设免费网页制作模板
  • 做视频网站什么平台好centos7 wordpress无权限
  • word模板网站怎么再贴吧给自己的网站做宣传
  • 企业营销型网站策划江苏泰州建设局网站
  • 给客户建完美网站网站建设的实验原理
  • 网站永久免费建站稿定设计app下载安装
  • 织梦网站视频沈阳企业网站制作哪家好
  • 流行网站设计wordpress企业建
  • seo技术快速网站排名西安个人建网站
  • 南宁好的网站建设公司网站运营合同
  • 手机可以搭建网站么水果网站建设方案
  • 开发公司空置房物管费归口什么费用网站排名优化软件
  • 网站单页设计廊坊宣传片制作公司
  • 用asp做的几个大网站北京注册公司要求
  • 网站建设海报素材 pp下载安装 app
  • 如何把网站上传到凡科做结构图的网站
  • 舟山网站建设企业湖北省疾病预防控制中心
  • 外贸网站如何做的好处网站开发用台式机电脑配置
  • 一个网站开发环境是什么网站模板制作工具
  • 做卫生用品都在什么网站wordpress 自定义帖子
  • 揭阳模板网站建站网络营销推广方式都有哪些