pytorch建立mobilenetV3-ssd網(wǎng)絡(luò)并進(jìn)行訓(xùn)練與預(yù)測方式
這篇文章記錄的是我在公司實習(xí)用深度學(xué)習(xí)做車輛信息識別項目時,用來做車輛檢測的算法。
因為我們公司面向的邊緣端計算,邊緣盒子的計算能力有限,所以我們在做算法研究時,就盡量選用輕量級算法,所以目標(biāo)檢測算法用mobilenetV3-ssd,這是一個精度能達(dá)到很高,權(quán)值很小的算法,我比較喜歡。
Step1:搭建mobilenetV3-ssd網(wǎng)絡(luò)框架
它的網(wǎng)絡(luò)原理很簡單,就是把傳統(tǒng)的ssd算法里面的VGG網(wǎng)絡(luò)換成了mobilenetV3,其他的都一樣。
需要提前準(zhǔn)備的函數(shù)和類
在真的寫網(wǎng)絡(luò)框架之前,我們需要把網(wǎng)絡(luò)中需要調(diào)用的一些激活函數(shù)和卷積塊先寫好。
先是mobilenetV3需要調(diào)用的兩個激活函數(shù),一個注意力模型SeModule,和卷積塊Block。
class hswish(nn.Module): def forward(self, x): out = x * F.relu6(x + float(3.0), inplace=True) / float(6.0) return out class hsigmoid(nn.Module): def forward(self, x): out = F.relu6(x + float(3.0), inplace=True) / float(6.0) return out class SeModule(nn.Module): def __init__(self, in_size, reduction=4): super(SeModule, self).__init__() self.avg_pool = nn.AdaptiveAvgPool2d(1) self.se = nn.Sequential( nn.Conv2d(in_size, in_size // reduction, kernel_size=1, stride=1, padding=0, bias=False), nn.BatchNorm2d(in_size // reduction), nn.ReLU(inplace=True), nn.Conv2d(in_size // reduction, in_size, kernel_size=1, stride=1, padding=0, bias=False), nn.BatchNorm2d(in_size), hsigmoid() ) def forward(self, x): return x * self.se(x) class Block(nn.Module): def __init__(self, kernel_size, in_size, expand_size, out_size, nolinear, semodule, stride): super(Block, self).__init__() self.stride = stride self.se = semodule self.output_status = False if kernel_size == 5 and in_size == 160 and expand_size == 672: self.output_status = True self.conv1 = nn.Conv2d(in_size, expand_size, kernel_size=1, stride=1, padding=0, bias=False) self.bn1 = nn.BatchNorm2d(expand_size) self.nolinear1 = nolinear self.conv2 = nn.Conv2d(expand_size, expand_size, kernel_size=kernel_size, stride=stride, padding=kernel_size//2, groups=expand_size, bias=False) self.bn2 = nn.BatchNorm2d(expand_size) self.nolinear2 = nolinear self.conv3 = nn.Conv2d(expand_size, out_size, kernel_size=1, stride=1, padding=0, bias=False) self.bn3 = nn.BatchNorm2d(out_size) self.shortcut = nn.Sequential() if stride == 1 and in_size != out_size: self.shortcut = nn.Sequential( nn.Conv2d(in_size, out_size, kernel_size=1, stride=1, padding=0, bias=False), nn.BatchNorm2d(out_size), ) def forward(self, x): out = self.nolinear1(self.bn1(self.conv1(x))) if self.output_status: expand = out out = self.nolinear2(self.bn2(self.conv2(out))) out = self.bn3(self.conv3(out)) if self.se != None: out = self.se(out) out = out + self.shortcut(x) if self.stride==1 else out if self.output_status: return (expand, out) return out
然后是ssd網(wǎng)絡(luò)需要調(diào)用的卷積塊。
def conv_bn(inp, oup, stride, groups=1, activation=nn.ReLU6): return nn.Sequential( nn.Conv2d(inp, oup, 3, stride, 1, bias=False, groups=groups), nn.BatchNorm2d(oup), activation(inplace=True) ) def conv_1x1_bn(inp, oup, groups=1, activation=nn.ReLU6): return nn.Sequential( nn.Conv2d(inp, oup, 1, 1, 0, bias=False, groups=groups), nn.BatchNorm2d(oup), activation(inplace=True) ) class AuxiliaryConvolutions(nn.Module): """ 輔助卷積層 """ def __init__(self): super(AuxiliaryConvolutions, self).__init__() self.extra_convs = [] self.extra_convs.append(conv_1x1_bn(960, 256)) self.extra_convs.append(conv_bn(256, 256, 2, groups=256)) self.extra_convs.append(conv_1x1_bn(256, 512, groups=1)) self.extra_convs.append(conv_1x1_bn(512, 128)) self.extra_convs.append(conv_bn(128, 128, 2, groups=128)) self.extra_convs.append(conv_1x1_bn(128, 256)) self.extra_convs.append(conv_1x1_bn(256, 128)) self.extra_convs.append(conv_bn(128, 128, 2, groups=128)) self.extra_convs.append(conv_1x1_bn(128, 256)) self.extra_convs.append(conv_1x1_bn(256, 64)) self.extra_convs.append(conv_bn(64, 64, 2, groups=64)) self.extra_convs.append(conv_1x1_bn(64, 128)) self.extra_convs = nn.Sequential(*self.extra_convs) self.init_conv2d() def init_conv2d(self): for m in self.modules(): if isinstance(m, nn.Conv2d): init.kaiming_normal_(m.weight, mode='fan_out') if m.bias is not None: init.constant_(m.bias, 0) elif isinstance(m, nn.BatchNorm2d): init.constant_(m.weight, 1) init.constant_(m.bias, 0) elif isinstance(m, nn.Linear): init.normal_(m.weight, std=0.001) if m.bias is not None: init.constant_(m.bias, 0) def forward(self, conv7_feats): """ Forward propagation. :param conv7_feats: lower-level conv7 feature map :return: higher-level feature maps conv8_2, conv9_2, conv10_2, and conv11_2 """ outs = [] out=conv7_feats for i, conv in enumerate(self.extra_convs): out = conv(out) if i % 3 == 2: outs.append(out) conv8_2_feats=outs[0] conv9_2_feats=outs[1] conv10_2_feats=outs[2] conv11_2_feats=outs[3] return conv8_2_feats, conv9_2_feats, conv10_2_feats, conv11_2_feats class PredictionConvolutions(nn.Module): def __init__(self, n_classes): """ 預(yù)測卷積層 """ super(PredictionConvolutions, self).__init__() self.n_classes = n_classes n_boxes = {'conv4_3': 4, 'conv7': 6, 'conv8_2': 6, 'conv9_2': 6, 'conv10_2': 6, 'conv11_2': 6} input_channels=[672, 960, 512, 256, 256, 128] self.loc_conv4_3 = nn.Conv2d(input_channels[0], n_boxes['conv4_3'] * 4, kernel_size=3, padding=1) self.loc_conv7 = nn.Conv2d(input_channels[1], n_boxes['conv7'] * 4, kernel_size=3, padding=1) self.loc_conv8_2 = nn.Conv2d(input_channels[2], n_boxes['conv8_2'] * 4, kernel_size=3, padding=1) self.loc_conv9_2 = nn.Conv2d(input_channels[3], n_boxes['conv9_2'] * 4, kernel_size=3, padding=1) self.loc_conv10_2 = nn.Conv2d(input_channels[4], n_boxes['conv10_2'] * 4, kernel_size=3, padding=1) self.loc_conv11_2 = nn.Conv2d(input_channels[5], n_boxes['conv11_2'] * 4, kernel_size=3, padding=1) self.cl_conv4_3 = nn.Conv2d(input_channels[0], n_boxes['conv4_3'] * n_classes, kernel_size=3, padding=1) self.cl_conv7 = nn.Conv2d(input_channels[1], n_boxes['conv7'] * n_classes, kernel_size=3, padding=1) self.cl_conv8_2 = nn.Conv2d(input_channels[2], n_boxes['conv8_2'] * n_classes, kernel_size=3, padding=1) self.cl_conv9_2 = nn.Conv2d(input_channels[3], n_boxes['conv9_2'] * n_classes, kernel_size=3, padding=1) self.cl_conv10_2 = nn.Conv2d(input_channels[4], n_boxes['conv10_2'] * n_classes, kernel_size=3, padding=1) self.cl_conv11_2 = nn.Conv2d(input_channels[5], n_boxes['conv11_2'] * n_classes, kernel_size=3, padding=1) self.init_conv2d() def init_conv2d(self): """ Initialize convolution parameters. """ for c in self.children(): if isinstance(c, nn.Conv2d): nn.init.xavier_uniform_(c.weight) nn.init.constant_(c.bias, 0.) def forward(self, conv4_3_feats, conv7_feats, conv8_2_feats, conv9_2_feats, conv10_2_feats, conv11_2_feats): batch_size = conv4_3_feats.size(0) l_conv4_3 = self.loc_conv4_3(conv4_3_feats) l_conv4_3 = l_conv4_3.permute(0, 2, 3, 1).contiguous() l_conv4_3 = l_conv4_3.view(batch_size, -1, 4) l_conv7 = self.loc_conv7(conv7_feats) l_conv7 = l_conv7.permute(0, 2, 3, 1).contiguous() l_conv7 = l_conv7.view(batch_size, -1, 4) l_conv8_2 = self.loc_conv8_2(conv8_2_feats) l_conv8_2 = l_conv8_2.permute(0, 2, 3, 1).contiguous() l_conv8_2 = l_conv8_2.view(batch_size, -1, 4) l_conv9_2 = self.loc_conv9_2(conv9_2_feats) l_conv9_2 = l_conv9_2.permute(0, 2, 3, 1).contiguous() l_conv9_2 = l_conv9_2.view(batch_size, -1, 4) l_conv10_2 = self.loc_conv10_2(conv10_2_feats) l_conv10_2 = l_conv10_2.permute(0, 2, 3, 1).contiguous() l_conv10_2 = l_conv10_2.view(batch_size, -1, 4) l_conv11_2 = self.loc_conv11_2(conv11_2_feats) l_conv11_2 = l_conv11_2.permute(0, 2, 3, 1).contiguous() l_conv11_2 = l_conv11_2.view(batch_size, -1, 4) c_conv4_3 = self.cl_conv4_3(conv4_3_feats) c_conv4_3 = c_conv4_3.permute(0, 2, 3, 1).contiguous() c_conv4_3 = c_conv4_3.view(batch_size, -1,self.n_classes) c_conv7 = self.cl_conv7(conv7_feats) c_conv7 = c_conv7.permute(0, 2, 3, 1).contiguous() c_conv7 = c_conv7.view(batch_size, -1,self.n_classes) c_conv8_2 = self.cl_conv8_2(conv8_2_feats) c_conv8_2 = c_conv8_2.permute(0, 2, 3, 1).contiguous() c_conv8_2 = c_conv8_2.view(batch_size, -1, self.n_classes) c_conv9_2 = self.cl_conv9_2(conv9_2_feats) c_conv9_2 = c_conv9_2.permute(0, 2, 3, 1).contiguous() c_conv9_2 = c_conv9_2.view(batch_size, -1, self.n_classes) c_conv10_2 = self.cl_conv10_2(conv10_2_feats) c_conv10_2 = c_conv10_2.permute(0, 2, 3, 1).contiguous() c_conv10_2 = c_conv10_2.view(batch_size, -1, self.n_classes) c_conv11_2 = self.cl_conv11_2(conv11_2_feats) c_conv11_2 = c_conv11_2.permute(0, 2, 3, 1).contiguous() c_conv11_2 = c_conv11_2.view(batch_size, -1, self.n_classes) locs = torch.cat([l_conv4_3, l_conv7, l_conv8_2, l_conv9_2, l_conv10_2, l_conv11_2], dim=1) classes_scores = torch.cat([c_conv4_3, c_conv7, c_conv8_2, c_conv9_2, c_conv10_2, c_conv11_2],dim=1) return locs, classes_scores
mobilenetV3_large
class MobileNetV3_Large(nn.Module): def __init__(self, num_classes=1000): super(MobileNetV3_Large, self).__init__() self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(16) self.hs1 = hswish() self.bneck = nn.Sequential( Block(3, 16, 16, 16, nn.ReLU(inplace=True), None, 1), Block(3, 16, 64, 24, nn.ReLU(inplace=True), None, 2), Block(3, 24, 72, 24, nn.ReLU(inplace=True), None, 1), Block(5, 24, 72, 40, nn.ReLU(inplace=True), SeModule(40), 2), Block(5, 40, 120, 40, nn.ReLU(inplace=True), SeModule(40), 1), Block(5, 40, 120, 40, nn.ReLU(inplace=True), SeModule(40), 1), Block(3, 40, 240, 80, hswish(), None, 2), Block(3, 80, 200, 80, hswish(), None, 1), Block(3, 80, 184, 80, hswish(), None, 1), Block(3, 80, 184, 80, hswish(), None, 1), Block(3, 80, 480, 112, hswish(), SeModule(112), 1), Block(3, 112, 672, 112, hswish(), SeModule(112), 1), Block(5, 112, 672, 160, hswish(), SeModule(160), 1), Block(5, 160, 672, 160, hswish(), SeModule(160), 2), Block(5, 160, 960, 160, hswish(), SeModule(160), 1), ) self.conv2 = nn.Conv2d(160, 960, kernel_size=1, stride=1, padding=0, bias=False) self.bn2 = nn.BatchNorm2d(960) self.hs2 = hswish() self.linear3 = nn.Linear(960, 1280) self.bn3 = nn.BatchNorm1d(1280) self.hs3 = hswish() self.linear4 = nn.Linear(1280, 1000) self.init_weights() #這個是加載預(yù)訓(xùn)練權(quán)值或初始化權(quán)值 # def load_pretrained_layers(self,pretrained): # pretrained_state_dict = torch.load(pretrained) # self.load_state_dict(pretrained_state_dict) # for param in self.parameters(): # param.requires_grad = False # print("\nLoaded base model.\n") def init_weights(self, pretrained=None):#如果不用預(yù)訓(xùn)練權(quán)值,把pretrained設(shè)為None就行 if isinstance(pretrained, str): #判斷一個對象是否是一個已知類型 checkpoint = torch.load(pretrained,map_location='cpu') ["state_dict"] self.load_state_dict(checkpoint,strict=False) for param in self.parameters(): param.requires_grad = True # to be or not to be # also load module # if isinstance(checkpoint, OrderedDict): # state_dict = checkpoint # elif isinstance(checkpoint, dict) and 'state_dict' in checkpoint: # state_dict = checkpoint['state_dict'] # else: # print("No state_dict found in checkpoint file") # if list(state_dict.keys())[0].startswith('module.'): # state_dict = {k[7:]: v for k, v in checkpoint['state_dict'].items()} # # load state_dict # if hasattr(self, 'module'): # self.module.load_state_dict( state_dict,strict=False) # else: # self.load_state_dict(state_dict,strict=False) print("\nLoaded base model.\n") elif pretrained is None: print("\nNo loaded base model.\n") for m in self.modules(): #self.modules()里面存儲了net的所有模塊。 if isinstance(m, nn.Conv2d): init.kaiming_normal_(m.weight, mode='fan_out') #用kaiming正態(tài)分布進(jìn)行初始化。 if m.bias is not None: init.constant_(m.bias, 0) elif isinstance(m, nn.BatchNorm2d): init.constant_(m.weight, 1) init.constant_(m.bias, 0) elif isinstance(m, nn.Linear): init.normal_(m.weight, std=0.001) if m.bias is not None: init.constant_(m.bias, 0) def forward(self, x): out = self.hs1(self.bn1(self.conv1(x))) for i, block in enumerate(self.bneck): out = block(out) if isinstance(out, tuple): conv4_3_feats =out[0] out = out[1] out = self.hs2(self.bn2(self.conv2(out))) conv7_feats=out return conv4_3_feats,conv7_feats
調(diào)用mobilenetV3的ssd網(wǎng)絡(luò)
class SSD300(nn.Module): """ The SSD300 network - encapsulates the base MobileNet network, auxiliary, and prediction convolutions. """ def __init__(self, n_classes): super(SSD300, self).__init__() self.n_classes = n_classes self.base = MobileNetV3_Large(num_classes=self.n_classes) self.aux_convs = AuxiliaryConvolutions() self.pred_convs = PredictionConvolutions(n_classes) self.rescale_factors = nn.Parameter(torch.FloatTensor(1, 672, 1, 1)) nn.init.constant_(self.rescale_factors, 20) self.priors_cxcy = self.create_prior_boxes() #這是在初始化先驗框? def forward(self, image): conv4_3_feats, conv7_feats = self.base(image) norm = conv4_3_feats.pow(2).sum(dim=1, keepdim=True).sqrt()+1e-10 conv4_3_feats = conv4_3_feats / norm conv4_3_feats = conv4_3_feats * self.rescale_factors conv8_2_feats, conv9_2_feats, conv10_2_feats, conv11_2_feats = self.aux_convs(conv7_feats) locs, classes_scores = self.pred_convs(conv4_3_feats, conv7_feats, conv8_2_feats, conv9_2_feats, conv10_2_feats,conv11_2_feats) return locs, classes_scores def create_prior_boxes(self): fmap_dims = {'conv4_3': 19, 'conv7': 10, 'conv8_2': 5, 'conv9_2': 3, 'conv10_2': 2, 'conv11_2': 1} obj_scales = {'conv4_3': 0.1, 'conv7': 0.2, 'conv8_2': 0.375, 'conv9_2': 0.55, 'conv10_2': 0.725, 'conv11_2': 0.9} aspect_ratios = {'conv4_3': [1., 2., 0.5], 'conv7': [1., 2., 3., 0.5, .333], 'conv8_2': [1., 2., 3., 0.5, .333], 'conv9_2': [1., 2., 3., 0.5, .333], 'conv10_2': [1., 2., 3., 0.5, .333], 'conv11_2': [1., 2., 3., 0.5, .333]} fmaps = list(fmap_dims.keys()) prior_boxes = [] for k, fmap in enumerate(fmaps): for i in range(fmap_dims[fmap]): for j in range(fmap_dims[fmap]): cx = (j + 0.5) / fmap_dims[fmap] cy = (i + 0.5) / fmap_dims[fmap] for ratio in aspect_ratios[fmap]: prior_boxes.append([cx, cy, obj_scales[fmap] * sqrt(ratio), obj_scales[fmap] / sqrt(ratio)]) if ratio == 1.: try: additional_scale = sqrt(obj_scales[fmap] * obj_scales[fmaps[k + 1]]) except IndexError: additional_scale = 1. prior_boxes.append([cx, cy, additional_scale, additional_scale]) prior_boxes = torch.FloatTensor(prior_boxes).to(device) prior_boxes.clamp_(0, 1) return prior_boxes def detect_objects(self, predicted_locs, predicted_scores, min_score, max_overlap, top_k): """ For each class, perform Non-Maximum Suppression (NMS) on boxes that are above a minimum threshold. :param min_score: minimum threshold for a box to be considered a match for a certain class :param max_overlap: maximum overlap two boxes can have so that the one with the lower score is not suppressed via NMS :param top_k: if there are a lot of resulting detection across all classes, keep only the top 'k' :return: detections (boxes, labels, and scores), lists of length batch_size """ batch_size = predicted_locs.size(0) n_priors = self.priors_cxcy.size(0) predicted_scores = F.softmax(predicted_scores, dim=2) all_images_boxes = list() all_images_labels = list() all_images_scores = list() assert n_priors == predicted_locs.size(1) == predicted_scores.size(1) for i in range(batch_size): decoded_locs = cxcy_to_xy( gcxgcy_to_cxcy(predicted_locs[i], self.priors_cxcy)) image_boxes = list() image_labels = list() image_scores = list() max_scores, best_label = predicted_scores[i].max(dim=1) for c in range(1, self.n_classes): class_scores = predicted_scores[i][:, c] score_above_min_score = class_scores > min_score n_above_min_score = score_above_min_score.sum().item() if n_above_min_score == 0: continue class_scores = class_scores[score_above_min_score] class_decoded_locs = decoded_locs[score_above_min_score] class_scores, sort_ind = class_scores.sort(dim=0, descending=True) class_decoded_locs = class_decoded_locs[sort_ind] overlap = find_jaccard_overlap(class_decoded_locs, class_decoded_locs) suppress = torch.zeros((n_above_min_score), dtype=torch.bool).to(device) for box in range(class_decoded_locs.size(0)): if suppress[box] == 1: continue suppress = torch.max(suppress, overlap[box] > max_overlap) suppress[box] = 0 image_boxes.append(class_decoded_locs[~suppress]) image_labels.append(torch.LongTensor((~ suppress).sum().item() * [c]).to(device)) image_scores.append(class_scores[~suppress]) if len(image_boxes) == 0: image_boxes.append(torch.FloatTensor([[0., 0., 1., 1.]]).to(device)) image_labels.append(torch.LongTensor([0]).to(device)) image_scores.append(torch.FloatTensor([0.]).to(device)) image_boxes = torch.cat(image_boxes, dim=0) image_labels = torch.cat(image_labels, dim=0) image_scores = torch.cat(image_scores, dim=0) n_objects = image_scores.size(0) if n_objects > top_k: image_scores, sort_ind = image_scores.sort(dim=0, descending=True) image_scores = image_scores[:top_k] image_boxes = image_boxes[sort_ind][:top_k] image_labels = image_labels[sort_ind][:top_k] all_images_boxes.append(image_boxes) all_images_labels.append(image_labels) all_images_scores.append(image_scores) return all_images_boxes, all_images_labels, all_images_scores
Step2:訓(xùn)練
關(guān)鍵在于訓(xùn)練,這里會利用pytorch的語法規(guī)則進(jìn)行訓(xùn)練。
訓(xùn)練數(shù)據(jù)預(yù)處理(VOC形式的dbb數(shù)據(jù))
本來是想在這寫用VOC2007進(jìn)行訓(xùn)練,但是后來想想,人總是要進(jìn)步嘛,不能總是利用VOC官方給的數(shù)據(jù)訓(xùn)練吧,所以這里還是清楚的講一下怎么將dbb數(shù)據(jù)轉(zhuǎn)換成VOC格式,并且進(jìn)行訓(xùn)練。
首先,去官網(wǎng)下載dbb數(shù)據(jù)。
然后,利用下面這個程序,將json格式的標(biāo)注文件裝換成xml格式的標(biāo)注文件。
import os from json import loads from dicttoxml import dicttoxml from xml.dom.minidom import parseString def jsonToXml(json_path, xml_path): #@abstract: transfer json file to xml file #json_path: complete path of the json file #xml_path: complete path of the xml file with open(json_path,'r',encoding='UTF-8')as json_file: load_dict=loads(json_file.read()) #print(load_dict) my_item_func = lambda x: 'Annotation' xml = dicttoxml(load_dict,custom_root='Annotations',item_func=my_item_func,attr_type=False) dom = parseString(xml) #print(dom.toprettyxml()) #print(type(dom.toprettyxml())) with open(xml_path,'w',encoding='UTF-8')as xml_file: xml_file.write(dom.toprettyxml()) def json_to_xml(json_dir, xml_dir): #transfer all json file which in the json_dir to xml_dir if(os.path.exists(xml_dir)==False): #如果沒有這個文件夾,就生成這個文件夾 os.makedirs(xml_dir) dir = os.listdir(json_dir) i=0 for file in dir: file_list=file.split(".") if(file_list[-1] == 'json'): jsonToXml(os.path.join(json_dir,file),os.path.join(xml_dir,file_list[0]+'.xml')) i=i+1 print('處理了第:',i,'個') if __name__ == '__main__': #transfer multi files j_dir = "train" #存放json文件的文件夾路徑 x_dir = "train_xml" #存放xml文件的文件夾路徑,里面不需要有文件 json_to_xml(j_dir, x_dir)
然后,利用下面這個程序,生成ImageSets/main里面的train.txt文件。
import os import random trainval_percent = 0.7 # 可以自己設(shè)置 train_percent = 0.8 # 可以自己設(shè)置 xmlfilepath = f"Annotations" # 地址填自己的 txtsavepath = f"ImageSets/Main" total_xml = os.listdir(xmlfilepath) num = len(total_xml) list = range(num) tv = int(num * trainval_percent) tr = int(tv * train_percent) trainval = random.sample(list, tv) train = random.sample(trainval, tr) ftrainval = open(txtsavepath + '/trainval.txt', 'w') ftest = open(txtsavepath + '/test.txt', 'w') ftrain = open(txtsavepath + '/train.txt', 'w') fval = open(txtsavepath + '/val.txt', 'w') for i in list: name = total_xml[i][:-4] + '\n' if i in trainval: ftrainval.write(name) if i in train: ftrain.write(name) else: fval.write(name) else: ftest.write(name) ftrainval.close() ftrain.close() fval.close() ftest.close() print('Well finshed')
然后,就是一個標(biāo)準(zhǔn)的VOC格式的dbb訓(xùn)練數(shù)據(jù)啦,簡單不簡單牙。
數(shù)據(jù)檢測
注意,這里一定不要省,不然你訓(xùn)練的時候很容易出問題。比如dbb數(shù)據(jù)里面有些特征框沒標(biāo)注好,標(biāo)注成了一條直線,導(dǎo)致訓(xùn)練的loss值會變成inf,你需要找出那些沒標(biāo)注好的圖片然后把它刪了。
我寫的檢查程序如下。注意,檢查出來,刪掉之后,要重新生成ImageSet/Main下的train.txt文件。
import json with open('processed_data\TRAIN_objects.json','r') as obj: a=json.load(obj) with open('processed_data\TRAIN_images.json','r') as obj: b=json.load(obj) for i in range(0,len(a),1): boxes=a[i]['boxes'] for boxe in boxes: if boxe[0]==boxe[2]: print(b[i]) if boxe[1]==boxe[3]: print(b[i])
編寫訓(xùn)練程序
import time import torch.backends.cudnn as cudnn import torch.optim import torch.utils.data from model import SSD300, MultiBoxLoss from datasets import PascalVOCDataset from utils import * from torch.optim.lr_scheduler import ReduceLROnPlateau # Data parameters data_folder = 'processed_data' #訓(xùn)練數(shù)據(jù)路徑文件所在的文件夾 keep_difficult = True #在voc數(shù)據(jù)標(biāo)注里面,有difficult這一項,這里就是決定要不要用這個。 # Model parameters # Not too many here since the SSD300 has a very specific structure n_classes = len(label_map) # 分類的類別數(shù),這個label_map是從utils里面導(dǎo)入進(jìn)來的。 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Learning parameters #checkpoint=None checkpoint = 'weights/MobilenetV3_Large-ssd300.pth.tar' #這個是導(dǎo)入預(yù)訓(xùn)練權(quán)值。 batch_size = 16 # batch size # iterations = 120000 # number of iterations to train 120000 workers = 8 #導(dǎo)入數(shù)據(jù)的進(jìn)程數(shù)。進(jìn)程數(shù)越多,導(dǎo)入得更快。 print_freq = 10 #決定每過多少個batchsize輸出一次訓(xùn)練信息。 lr =1e-3 # learning rate #decay_lr_to = 0.1 # decay learning rate to this fraction of the existing learning rate momentum = 0.9 # momentum weight_decay = 5e-4 # weight decay:加入權(quán)重衰減,收斂得會更快。 grad_clip = None #這是決定是否采用clip gradients方法,clip gradients方法是一種解決梯度爆炸的方法。 cudnn.benchmark = True #這是一種提高訓(xùn)練效率的方法,一般都會加 def main(): """ Training. """ global start_epoch, label_map, epoch, checkpoint, decay_lr_at #初始化模型,或者加載預(yù)訓(xùn)練權(quán)重 if checkpoint is None: #如果沒有預(yù)訓(xùn)練權(quán)重,則初始化模型 print("checkpoint none") start_epoch = 0 model = SSD300(n_classes=n_classes) #在這個地方導(dǎo)入模型 # Initialize the optimizer, with twice the default learning rate for biases, as in the original Caffe repo biases = list() not_biases = list() for param_name, param in model.named_parameters(): #model.named_parameters()給出網(wǎng)絡(luò)的名字和參數(shù)迭代器 if param.requires_grad: #判斷是否是需要求導(dǎo)的參數(shù) if param_name.endswith('.bias'): #如果是以bias結(jié)尾的參數(shù)名,則需要加偏置。 biases.append(param) else: #否則不需要加偏置。 not_biases.append(param) # differnet optimizer # optimizer = torch.optim.SGD(params=[{'params': biases, 'lr': 2 * lr}, {'params': not_biases}], # lr=lr, momentum=momentum, weight_decay=weight_decay) optimizer = torch.optim.SGD(params=[{'params': biases, 'lr': lr}, {'params': not_biases}], lr=lr, momentum=momentum, weight_decay=weight_decay) #optimizer = torch.optim.SGD(params=[{'params':model.parameters(), 'lr': 2 * lr}, {'params': model.parameters}], lr=lr, momentum=momentum, weight_decay=weight_decay) else: print("checkpoint load") checkpoint = torch.load(checkpoint,map_location='cuda:0') start_epoch = checkpoint['epoch'] + 1 #這個是告訴你,這個預(yù)訓(xùn)練權(quán)值之前已經(jīng)訓(xùn)練了多少次迭代 print('\nLoaded checkpoint from epoch %d.\n' % start_epoch) model = checkpoint['model'] optimizer = checkpoint['optimizer'] # Move to default device model = model.to(device) criterion = MultiBoxLoss(priors_cxcy=model.priors_cxcy).to(device) #初始化損失函與先驗框,這個model.priors_cxcy返回的是一組初始化產(chǎn)生的先驗框 # Custom dataloaders train_dataset = PascalVOCDataset(data_folder,split='train',keep_difficult=keep_difficult) #返回image, boxes, labels, difficulties train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=train_dataset.collate_fn, num_workers=workers, pin_memory=True) #將數(shù)據(jù)按照batchsize封裝成tensor。 # Calculate total number of epochs to train and the epochs to decay learning rate at (i.e. convert iterations to epochs) # To convert iterations to epochs, divide iterations by the number of iterations per epoch # now it is mobilenet v3,VGG paper trains for 120,000 iterations with a batch size of 32, decays after 80,000 and 100,000 iterations, epochs = 800 # decay_lr_at =[154, 193] # print("decay_lr_at:",decay_lr_at) print("epochs:",epochs) for param_group in optimizer.param_groups: #動態(tài)調(diào)節(jié)優(yōu)化器學(xué)習(xí)率 optimizer.param_groups[1]['lr']=lr print("learning rate. The new LR is %f\n" % (optimizer.param_groups[1]['lr'],)) # Epochs,I try to use different learning rate shcheduler #different scheduler six way you could try #scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,T_max = (epochs // 7) + 1) # 下面這句話是根據(jù)epoch動態(tài)調(diào)整學(xué)習(xí)率的方法 scheduler = ReduceLROnPlateau(optimizer,mode="min",factor=0.1,patience=15,verbose=True, threshold=0.00001, threshold_mode='rel', cooldown=0, min_lr=0, eps=1e-08) for epoch in range(start_epoch, epochs): #在這里面訓(xùn)練 # Decay learning rate at particular epochs # if epoch in decay_lr_at: # adjust_learning_rate_epoch(optimizer,epoch) # One epoch's training train(train_loader=train_loader, model=model, criterion=criterion, optimizer=optimizer, epoch=epoch) print("epoch loss:",train_loss) scheduler.step(train_loss) #這一步是對學(xué)習(xí)率進(jìn)行調(diào)整 # Save checkpoint save_checkpoint(epoch, model, optimizer) def train(train_loader, model, criterion, optimizer, epoch): model.train() #啟用BatchNormalization與Dropout batch_time = AverageMeter() #AverageMeter()這個類是用來記錄數(shù)據(jù)的最新,平均,總和,計數(shù)的值的,里面就兩個函數(shù)(reset和update)看源碼就懂了 data_time = AverageMeter() losses = AverageMeter() start = time.time() global train_loss # Batches for i, (images, boxes, labels, _) in enumerate(train_loader): data_time.update(time.time() - start) # if(i%200==0): # adjust_learning_rate_iter(optimizer,epoch) # print("batch id:",i)#([8, 3, 300, 300]) #N=8 # Move to default device images = images.to(device) # (batch_size (N), 3, 300, 300) boxes = [b.to(device) for b in boxes] labels = [l.to(device) for l in labels] # Forward prop. predicted_locs, predicted_scores = model(images) # (N, anchor_boxes_size, 4), (N, anchor_boxes_size, n_classes) # Loss loss = criterion(predicted_locs, predicted_scores, boxes, labels) # scalar train_loss=loss #print("training",train_loss) # Backward prop. optimizer.zero_grad()#初始化梯度 loss.backward()# 根據(jù)loss的值求相應(yīng)weight的梯度 # Clip gradients, if necessary if grad_clip is not None: #防止梯度爆炸用的 clip_gradient(optimizer, grad_clip) # Update model optimizer.step() #這一步是更新權(quán)值 losses.update(loss.item(), images.size(0)) batch_time.update(time.time() - start) start = time.time() # Print status if i % print_freq == 0: print('Epoch: [{0}][{1}/{2}][{3}]\t' 'Batch Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t' 'Data Time {data_time.val:.3f} ({data_time.avg:.3f})\t' 'Loss {loss.val:.4f} ({loss.avg:.4f})\t'.format(epoch, i, len(train_loader),optimizer.param_groups[1]['lr'], batch_time=batch_time, data_time=data_time, loss=losses)) #break #test del predicted_locs, predicted_scores, images, boxes, labels # free some memory since their histories may be stored def adjust_learning_rate_epoch(optimizer,cur_epoch): """ Scale learning rate by a specified factor. :param optimizer: optimizer whose learning rate must be shrunk. :param scale: factor to multiply learning rate with. """ for param_group in optimizer.param_groups: param_group['lr'] = param_group['lr'] * 0.1 print("DECAYING learning rate. The new LR is %f\n" % (optimizer.param_groups[1]['lr'],)) #warmup ,how much learning rate. def adjust_learning_rate_iter(optimizer,cur_epoch): if(cur_epoch==0 or cur_epoch==1 ): for param_group in optimizer.param_groups: param_group['lr'] =param_group['lr'] + 0.0001 print("DECAYING learning rate iter. The new LR is %f\n" % (optimizer.param_groups[1]['lr'],)) if __name__ == '__main__': main()
這個程序是以調(diào)用json格式的數(shù)據(jù)進(jìn)行讀取訓(xùn)練數(shù)據(jù)和訓(xùn)練標(biāo)簽的,所以,訓(xùn)練之前還需要轉(zhuǎn)一下數(shù)據(jù)格式,代碼如下。
#使用注意事項,使用試記得修改voc_labels為你自己訓(xùn)練數(shù)據(jù)的標(biāo)簽 #from utils import create_data_lists import os import xml.etree.ElementTree as ET import json # Label map #voc_labels = ('aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 'diningtable', #'dog', 'horse', 'motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor') #voc_labels=('bus','car') voc_labels=('bus', 'traffic light', 'traffic sign', 'person', 'bike', 'truck', 'motor', 'car', 'train','rider') label_map = {k: v + 1 for v, k in enumerate(voc_labels)} label_map['background'] = 0 rev_label_map = {v: k for k, v in label_map.items()} # Inverse mapping def parse_annotation(annotation_path): tree = ET.parse(annotation_path) root = tree.getroot() boxes = list() labels = list() difficulties = list() for category in root.iter('category'): difficult=int(0.) label=category.text.lower().strip() if label not in label_map: continue labels.append(label_map[label]) difficulties.append(difficult) for box2d in root.iter('box2d'): x1=int(float(box2d.find('x1').text)) y1=int(float(box2d.find('y1').text)) x2=int(float(box2d.find('x2').text)) y2=int(float(box2d.find('y2').text)) boxes.append([x1,y1,x2,y2]) return {'boxes': boxes, 'labels': labels,'difficulties':difficulties} def create_data_lists(voc07_path,output_folder): """ Create lists of images, the bounding boxes and labels of the objects in these images, and save these to file. :param voc07_path: path to the 'VOC2007' folder :param voc12_path: path to the 'VOC2012' folder :param output_folder: folder where the JSONs must be saved """ voc07_path = os.path.abspath(voc07_path) train_images = list() train_objects = list() n_objects = 0 # Training data path=voc07_path # Find IDs of images in training data print(path) with open(os.path.join(path, 'ImageSets/Main/trainval.txt')) as f: ids = f.read().splitlines() for id in ids: # Parse annotation's XML file objects = parse_annotation(os.path.join(path, 'Annotations', id + '.xml')) if len(objects) == 0: continue n_objects += len(objects) train_objects.append(objects) train_images.append(os.path.join(path, 'JPEGImages', id + '.jpg')) assert len(train_objects) == len(train_images) # Save to file with open(os.path.join(output_folder, 'TRAIN_images.json'), 'w') as j: #寫入訓(xùn)練圖片路徑 json.dump(train_images, j) with open(os.path.join(output_folder, 'TRAIN_objects.json'), 'w') as j: #寫入訓(xùn)練標(biāo)簽信息 json.dump(train_objects, j) with open(os.path.join(output_folder, 'label_map.json'), 'w') as j: #寫入訓(xùn)練標(biāo)簽類別 json.dump(label_map, j) # save label map too print('\nThere are %d training images containing a total of %d objects. Files have been saved to %s.' % ( len(train_images), n_objects, os.path.abspath(output_folder))) # Test data test_images = list() test_objects = list() n_objects = 0 # Find IDs of images in the test data with open(os.path.join(voc07_path, 'ImageSets/Main/trainval.txt')) as f: ids = f.read().splitlines() for id in ids: # Parse annotation's XML file objects = parse_annotation(os.path.join(voc07_path, 'Annotations', id + '.xml')) if len(objects) == 0: continue test_objects.append(objects) n_objects += len(objects) test_images.append(os.path.join(voc07_path, 'JPEGImages', id + '.jpg')) assert len(test_objects) == len(test_images) # Save to file with open(os.path.join(output_folder, 'TEST_images.json'), 'w') as j: json.dump(test_images, j) with open(os.path.join(output_folder, 'TEST_objects.json'), 'w') as j: json.dump(test_objects, j) print('\nThere are %d test images containing a total of %d objects. Files have been saved to %s.' % ( len(test_images), n_objects, os.path.abspath(output_folder))) if __name__ == '__main__': create_data_lists(voc07_path='D:/study/internship/work_file/Dataset/bdd100k/bdd1k',output_folder='processed_data')
訓(xùn)練過程如下圖所示。
step3:預(yù)測
終于到預(yù)測啦,享受革命成果的時候到了。
代碼如下。注意,雖然在程序中沒有引入神經(jīng)網(wǎng)絡(luò)模型文件,但是這個模型文件是必須在相對路徑下才能運(yùn)行的,因為這個模型文件的名字保存在權(quán)重文件里面,會要調(diào)用的。
from torchvision import transforms from utils import * from PIL import Image, ImageDraw, ImageFont import time device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Load model checkpoint checkpoint = 'checkpoint_ssd300.pth.tar' checkpoint = torch.load(checkpoint,map_location='cuda:0') print(checkpoint) start_epoch = checkpoint['epoch'] + 1 print('\nLoaded checkpoint from epoch %d.\n' % start_epoch) model = checkpoint['model'] model = model.to(device) model.eval() #如果是預(yù)測,使用這個;如果是訓(xùn)練,使用model.train() def detect(original_image, min_score, max_overlap, top_k, suppress=None): """ Detect objects in an image with a trained SSD300, and visualize the results. :param original_image: image, a PIL Image :param min_score: minimum threshold for a detected box to be considered a match for a certain class :param max_overlap: maximum overlap two boxes can have so that the one with the lower score is not suppressed via Non-Maximum Suppression (NMS) :param top_k: if there are a lot of resulting detection across all classes, keep only the top 'k' :param suppress: classes that you know for sure cannot be in the image or you do not want in the image, a list :return: annotated image, a PIL Image """ # Transform resize = transforms.Resize((300, 300)) to_tensor = transforms.ToTensor() #這句話 normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]) image = normalize(to_tensor(resize(original_image))) # Move to default device image = image.to(device) #這句話是將圖片的張量讀取到GPU上。 # Forward prop. predicted_locs, predicted_scores = model(image.unsqueeze(0)) #unsqueeze用于添加維度。 ###############################################后面都是解碼與畫圖了 # Detect objects in SSD output det_boxes, det_labels, det_scores = model.detect_objects(predicted_locs, predicted_scores, min_score=min_score, max_overlap=max_overlap, top_k=top_k) #將預(yù)測結(jié)果進(jìn)行解碼 # Move detections to the CPU det_boxes = det_boxes[0].to('cpu') # Transform to original image dimensions original_dims = torch.FloatTensor( [original_image.width, original_image.height, original_image.width, original_image.height]).unsqueeze(0) det_boxes = det_boxes * original_dims # Decode class integer labels det_labels = [rev_label_map[l] for l in det_labels[0].to('cpu').tolist()] print(det_labels) # If no objects found, the detected labels will be set to ['0.'], i.e. ['background'] in SSD300.detect_objects() in model.py if det_labels == ['background']: # Just return original image return original_image # Annotate annotated_image = original_image draw = ImageDraw.Draw(annotated_image) font = ImageFont.truetype("simhei.ttf", 15) # Suppress specific classes, if needed for i in range(det_boxes.size(0)): if suppress is not None: if det_labels[i] in suppress: continue # Boxes box_location = det_boxes[i].tolist() draw.rectangle(xy=box_location, outline=label_color_map[det_labels[i]]) draw.rectangle(xy=[l + 1. for l in box_location], outline=label_color_map[ det_labels[i]]) # a second rectangle at an offset of 1 pixel to increase line thickness # draw.rectangle(xy=[l + 2. for l in box_location], outline=label_color_map[ # det_labels[i]]) # a third rectangle at an offset of 1 pixel to increase line thickness # draw.rectangle(xy=[l + 3. for l in box_location], outline=label_color_map[ # det_labels[i]]) # a fourth rectangle at an offset of 1 pixel to increase line thickness # Text text_size = font.getsize(det_labels[i].upper()) text_location = [box_location[0] + 2., box_location[1] - text_size[1]] textbox_location = [box_location[0], box_location[1] - text_size[1], box_location[0] + text_size[0] + 4., box_location[1]] draw.rectangle(xy=textbox_location, fill=label_color_map[det_labels[i]]) draw.text(xy=text_location, text=det_labels[i].upper(), fill='white', font=font) del draw return annotated_image if __name__ == '__main__': img_path = 'feiji1.jpg' original_image = Image.open(img_path, mode='r') original_image = original_image.convert('RGB') detect(original_image, min_score=0.2, max_overlap=0.5, top_k=200).show()
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python創(chuàng)建文件和追加文件內(nèi)容實例
這篇文章主要介紹了Python創(chuàng)建文件和追加文件內(nèi)容實例,本文同時給出了把標(biāo)準(zhǔn)輸出定向到文件實例,需要的朋友可以參考下2014-10-10python實現(xiàn)學(xué)生管理系統(tǒng)源碼
這篇文章主要為大家詳細(xì)介紹了python實現(xiàn)學(xué)生管理系統(tǒng)源碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-04-04Python+Selenium自動化環(huán)境搭建與操作基礎(chǔ)詳解
Selenium是如今最常用的自動化測試工具之一,支持快速開發(fā)自動化測試框架,且支持在多種瀏覽器上執(zhí)行測試。本文將介紹關(guān)于Selenium?Python自動化腳本環(huán)境搭建的相關(guān)資料,需要的朋友可以參考下2022-03-03Python中的np.vstack()和np.hstack()詳解
這篇文章主要介紹了np.vstack()和np.hstack(),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-04-04Python tensorflow與pytorch的浮點運(yùn)算數(shù)如何計算
這篇文章主要介紹了Python tensorflow與pytorch的浮點運(yùn)算數(shù)如何計算,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-11-11