Pytorch實現(xiàn)圖片異常檢測功能
圖片異常檢測
異常檢測指的是在正常的圖片中找到異常的數(shù)據(jù),由于無法通過規(guī)則進行識別判斷,這樣的應用場景通常都是需要人工進行識別,比如殘次品的識別,圖片異常識別模型的目標是可以代替或者輔助人工進行識別異常圖片。
AnoGAN 模型
由于正常圖片的數(shù)據(jù)量遠大于異常圖片,可能只有 1/100 的圖片是異常圖片,甚至更小。通過圖片分類模型很難實現(xiàn)異常圖片的識別,因為無法找到足夠的異常數(shù)據(jù)進行訓練。因此,只能通過正常圖片去構建異常檢測模型。如何通過正常的圖片實現(xiàn)檢測異常圖片的模型,可以使用之前用的對抗網(wǎng)絡,通過識別網(wǎng)絡進行檢測,圖片是正常數(shù)據(jù)還是偽造數(shù)據(jù)。AnoGAN 模型是用于識別異常圖片的模型,如果只用GAN 模型中的識別網(wǎng)絡進行判別,效果并不好,所以 AnoGAN 網(wǎng)絡不光依靠識別網(wǎng)絡,生成網(wǎng)絡在其中也發(fā)揮重要的作用。
對于AnoGAN,對于輸入的數(shù)據(jù),AnoGAN 網(wǎng)絡首先會對圖片生成噪聲 Z。通過噪聲 Z 輸入生成網(wǎng)絡生成可以被識別的圖片,如果訓練集中不存在這樣的圖片,例如異常圖片,那么生成網(wǎng)絡是無法生成的,這類圖片就是異常圖片。
噪聲 Z 的生成方式,初始狀態(tài)噪聲是隨機生成的,隨后噪聲通過網(wǎng)絡生成圖片,把生成的圖片訓練集作比較,比較的方式是通過像素差值的絕對值求和,最后算出損失值,最后通過網(wǎng)絡進行訓練以減少損失值。
上述的這種損失值在AutoGen 中被稱為 Residual Loss,如果只有 Residual Loss,模型效果有限。所以,AnoGAN 這里也利用了判別網(wǎng)絡,將測試圖像和生成圖像輸入到判別網(wǎng)絡,并對判別網(wǎng)絡的輸出特征進行差值計算,這個差值稱為 Discrimination loss。最后通過 Discrimination Loss 和 Residual Loss 合并組成損失函數(shù)。
數(shù)據(jù)準備
import os import urllib.request import zipfile import tarfile import matplotlib.pyplot as plt %matplotlib inline from PIL import Image import numpy as np #不存在“data”文件夾時創(chuàng)建 data_dir = "./data/" if not os.path.exists(data_dir): os.mkdir(data_dir) import sklearn # 下載并讀取MNIST的手寫數(shù)字圖像。 from sklearn.datasets import fetch_openml mnist = fetch_openml('mnist_784', version=1, data_home="./data/") #data_home指定保存地址。 # 數(shù)據(jù)的提取 X = mnist.data y = mnist.target # 將MNIST的第一個數(shù)據(jù)可視化 plt.imshow(np.array(X.iloc[0]).reshape(28, 28), cmap='gray') print("這個圖像數(shù)據(jù)的標簽是{}".format(y[0])) #在文件夾“data”下創(chuàng)建文件夾“img_78” data_dir_path = "./data/img_78/" if not os.path.exists(data_dir_path): os.mkdir(data_dir_path) #從MNIST將數(shù)字7、8的圖像作為圖像保存到“img_78”文件夾中 count7=0 count8=0 max_num=200 # 每制作200張圖片 for i in range(len(X)): # 圖像7的制作 if (y[i] == "7") and (count7<max_num): file_path="./data/img_78/img_7_"+str(count7)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28×28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC) # 擴大到64×64 pil_img_f.save(file_path) # 保存 count7+=1 #圖像8的制作 if (y[i] == "8") and (count8<max_num): file_path="./data/img_78/img_8_"+str(count8)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28*28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC) # 擴大到64×64 pil_img_f.save(file_path) # 保存 count8+=1 # 制作200張7和8之后,break if (count7>=max_num) and (count8>=max_num): break # 在文件夾“data”下面創(chuàng)建文件夾“test” data_dir_path = "./data/test/" if not os.path.exists(data_dir_path): os.mkdir(data_dir_path) # 在上述制作7,8圖像時使用的index的最終值 i_start = i+1 print(i_start) # 從MNIST將數(shù)字7、8的圖像作為圖像保存到“img_78”文件夾中 count2=0 count7=0 count8=0 max_num=5 #每制作五張圖片 for i in range(i_start,len(X)): # 從i_start開始 #圖像2的制作 if (y[i] == "2") and (count2<max_num): file_path="./data/test/img_2_"+str(count2)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28×28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC) # 擴大到64×64 pil_img_f.save(file_path) # 保存 count2+=1 # 圖像7的制作 if (y[i] == "7") and (count7<max_num): file_path="./data/test/img_7_"+str(count7)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) #將圖像變形為28×28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC) # 6擴大到64×64 pil_img_f.save(file_path) # 保存 count7+=1 # 圖像8的制作 if (y[i] == "8") and (count8<max_num): file_path="./data/test/img_8_"+str(count8)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28*28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC) # 擴大到64×64 pil_img_f.save(file_path) # 保存 count8+=1 # 在文件夾“data”下創(chuàng)建文件夾“img_78_28size” data_dir_path = "./data/img_78_28size/" if not os.path.exists(data_dir_path): os.mkdir(data_dir_path) # 從MNIST將數(shù)字7、8的圖像作為圖像保存到“img_78_28size”文件夾中 count7=0 count8=0 max_num=200 # 每制作200張圖片 for i in range(len(X)): # 圖像7的制作 if (y[i] == "7") and (count7<max_num): file_path="./data/img_78_28size/img_7_"+str(count7)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28×28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f.save(file_path) # 保存 count7+=1 # 圖像8的制作 if (y[i] == "8") and (count8<max_num): file_path="./data/img_78_28size/img_8_"+str(count8)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28*28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 畫像變成PIL pil_img_f.save(file_path) # 保存 count8+=1 if (count7>=max_num) and (count8>=max_num): break # 在文件夾“data”下面創(chuàng)建文件夾“test” data_dir_path = "./data/test_28size/" if not os.path.exists(data_dir_path): os.mkdir(data_dir_path) # 在上述制作7,8圖像時使用的index的最終值 i_start = i+1 print(i_start) # 從MNIST將數(shù)字7、8的圖像作為圖像保存到“img_78”文件夾中 count2=0 count7=0 count8=0 max_num=5 # 每制作五張圖片 for i in range(i_start,len(X)): #從i_start開始 # 圖像2的制作 if (y[i] == "2") and (count2<max_num): file_path="./data/test_28size/img_2_"+str(count2)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28×28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f.save(file_path) # 保存 count2+=1 # 畫像7的制作 if (y[i] == "7") and (count7<max_num): file_path="./data/test_28size/img_7_"+str(count7)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28×28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f.save(file_path) # 保存 count7+=1 # 圖像8的制作 if (y[i] == "8") and (count8<max_num): file_path="./data/test_28size/img_8_"+str(count8)+".jpg" im_f=(np.array(X.iloc[i]).reshape(28, 28)) # 將圖像變形為28*28 pil_img_f = Image.fromarray(im_f.astype(np.uint8)) # 把圖像變成PIL pil_img_f.save(file_path) # 保存 count8+=1
AnoGAN 實現(xiàn)
AnoGAN 網(wǎng)絡實現(xiàn)以及訓練、驗證
# 導入軟件包 import random import math import time import pandas as pd import numpy as np from PIL import Image import torch import torch.utils.data as data import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import transforms # Setup seeds torch.manual_seed(1234) np.random.seed(1234) random.seed(1234) class Generator(nn.Module): def __init__(self, z_dim=20, image_size=64): super(Generator, self).__init__() self.layer1 = nn.Sequential( nn.ConvTranspose2d(z_dim, image_size * 8, kernel_size=4, stride=1), nn.BatchNorm2d(image_size * 8), nn.ReLU(inplace=True)) self.layer2 = nn.Sequential( nn.ConvTranspose2d(image_size * 8, image_size * 4, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(image_size * 4), nn.ReLU(inplace=True)) self.layer3 = nn.Sequential( nn.ConvTranspose2d(image_size * 4, image_size * 2, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(image_size * 2), nn.ReLU(inplace=True)) self.layer4 = nn.Sequential( nn.ConvTranspose2d(image_size * 2, image_size, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(image_size), nn.ReLU(inplace=True)) self.last = nn.Sequential( nn.ConvTranspose2d(image_size, 1, kernel_size=4, stride=2, padding=1), nn.Tanh()) #注意 :由于是黑白圖像,因此輸出通道數(shù)量為1 def forward(self, z): out = self.layer1(z) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) out = self.last(out) return out # 動作確認 import matplotlib.pyplot as plt %matplotlib inline G = Generator(z_dim=20, image_size=64) # 輸入的隨機數(shù) input_z = torch.randn(1, 20) # 將張量尺寸變形為(1,20,1,1) input_z = input_z.view(input_z.size(0), input_z.size(1), 1, 1) # 輸出假圖像 fake_images = G(input_z) img_transformed = fake_images[0][0].detach().numpy() plt.imshow(img_transformed, 'gray') plt.show() class Discriminator(nn.Module): def __init__(self, z_dim=20, image_size=64): super(Discriminator, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, image_size, kernel_size=4, stride=2, padding=1), nn.LeakyReLU(0.1, inplace=True)) #注意 :由于是黑白圖像,因此輸出通道數(shù)量為1 self.layer2 = nn.Sequential( nn.Conv2d(image_size, image_size*2, kernel_size=4, stride=2, padding=1), nn.LeakyReLU(0.1, inplace=True)) self.layer3 = nn.Sequential( nn.Conv2d(image_size*2, image_size*4, kernel_size=4, stride=2, padding=1), nn.LeakyReLU(0.1, inplace=True)) self.layer4 = nn.Sequential( nn.Conv2d(image_size*4, image_size*8, kernel_size=4, stride=2, padding=1), nn.LeakyReLU(0.1, inplace=True)) self.last = nn.Conv2d(image_size*8, 1, kernel_size=4, stride=1) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) feature = out #最后將通道集中到一個特征量中 feature = feature.view(feature.size()[0], -1) #轉(zhuǎn)換為二維 out = self.last(out) return out, feature # 確認程序執(zhí) D = Discriminator(z_dim=20, image_size=64) #生成偽造圖像 input_z = torch.randn(1, 20) input_z = input_z.view(input_z.size(0), input_z.size(1), 1, 1) fake_images = G(input_z) #將偽造的圖像輸入判別器D中 d_out = D(fake_images) # 將輸出值d_out乘以Sigmoid函數(shù),將其轉(zhuǎn)換成0~1的值 print(nn.Sigmoid()(d_out[0])) # feature print(d_out[1].shape) def make_datapath_list(): """制作用于學習、驗證的圖像數(shù)據(jù)和標注數(shù)據(jù)的文件路徑表。 """ train_img_list = list() # 保存圖像文件路徑 for img_idx in range(200): img_path = "./data/img_78/img_7_" + str(img_idx)+'.jpg' train_img_list.append(img_path) img_path = "./data/img_78/img_8_" + str(img_idx)+'.jpg' train_img_list.append(img_path) return train_img_list class ImageTransform(): """圖像的預處理類""" def __init__(self, mean, std): self.data_transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean, std) ]) def __call__(self, img): return self.data_transform(img) class GAN_Img_Dataset(data.Dataset): """圖像的Dataset類。繼承PyTorch的Dataset類""" def __init__(self, file_list, transform): self.file_list = file_list self.transform = transform def __len__(self): '''返回圖像的張數(shù)''' return len(self.file_list) def __getitem__(self, index): '''獲取經(jīng)過預處理后的圖像的張量格式的數(shù)據(jù)''' img_path = self.file_list[index] img = Image.open(img_path) # [高][寬]黑白 #圖像的預處理 img_transformed = self.transform(img) return img_transformed #創(chuàng)建DataLoader并確認操作 #創(chuàng)建文件列表 train_img_list=make_datapath_list() # 創(chuàng)建Dataset mean = (0.5,) std = (0.5,) train_dataset = GAN_Img_Dataset( file_list=train_img_list, transform=ImageTransform(mean, std)) # 創(chuàng)建DataLoader batch_size = 64 train_dataloader = torch.utils.data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True) #確認執(zhí)行結果 batch_iterator = iter(train_dataloader) # 轉(zhuǎn)換成迭代器 imges = next(batch_iterator) #取出位于第一位的元素 print(imges.size()) # torch.Size([64, 1, 64, 64]) #網(wǎng)絡的初始化 def weights_init(m): classname = m.__class__.__name__ if classname.find('Conv') != -1: #Conv2d和ConvTranspose2d的初始化 nn.init.normal_(m.weight.data, 0.0, 0.02) nn.init.constant_(m.bias.data, 0) elif classname.find('BatchNorm') != -1: # BatchNorm2d的初始化 nn.init.normal_(m.weight.data, 1.0, 0.02) nn.init.constant_(m.bias.data, 0) # 初始化的實施 G.apply(weights_init) D.apply(weights_init) print("網(wǎng)絡已經(jīng)成功地完成了初始化") # 創(chuàng)建一個函數(shù)來學習模型 def train_model(G, D, dataloader, num_epochs): #確認是否可以使用GPU device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print("使用設備:", device) # 優(yōu)化方法的設定 g_lr, d_lr = 0.0001, 0.0004 beta1, beta2 = 0.0, 0.9 g_optimizer = torch.optim.Adam(G.parameters(), g_lr, [beta1, beta2]) d_optimizer = torch.optim.Adam(D.parameters(), d_lr, [beta1, beta2]) # 定義誤差函數(shù) criterion = nn.BCEWithLogitsLoss(reduction='mean') # 使用硬編碼的參數(shù) z_dim = 20 mini_batch_size = 64 #將網(wǎng)絡變成GPU G.to(device) D.to(device) G.train() # 將模型轉(zhuǎn)換為訓練模式 D.train() # 將模型轉(zhuǎn)換為訓練模式 #如果網(wǎng)絡相對固定,則開啟加速 torch.backends.cudnn.benchmark = True # 圖像的張數(shù) num_train_imgs = len(dataloader.dataset) batch_size = dataloader.batch_size # 設置了迭代計數(shù)器 iteration = 1 logs = [] #epoch循環(huán) for epoch in range(num_epochs): # 保存開始時間 t_epoch_start = time.time() epoch_g_loss = 0.0 # epoch損失總和 epoch_d_loss = 0.0 # epoch損失總和 print('-------------') print('Epoch {}/{}'.format(epoch, num_epochs)) print('-------------') print('(train)') # 以minibatch為單位從數(shù)據(jù)加載器中讀取數(shù)據(jù)的循環(huán) for imges in dataloader: # -------------------- # 1. 判別器D的學習 # -------------------- # 如果小批次的尺寸設置為1,會導致批次歸一化處理產(chǎn)生錯誤,因此需要避免 if imges.size()[0] == 1: continue #如果能使用GPU,則將數(shù)據(jù)送入GPU中 imges = imges.to(device) #創(chuàng)建正確答案標簽和偽造數(shù)據(jù)標簽 #在epoch最后的迭代中,小批次的數(shù)量會減少 mini_batch_size = imges.size()[0] label_real = torch.full((mini_batch_size,), 1).to(device) label_fake = torch.full((mini_batch_size,), 0).to(device) #對真正的圖像進行判定 d_out_real, _ = D(imges) #生成偽造圖像并進行判定 input_z = torch.randn(mini_batch_size, z_dim).to(device) input_z = input_z.view(input_z.size(0), input_z.size(1), 1, 1) fake_images = G(input_z) d_out_fake, _ = D(fake_images) #計算誤差 d_loss_real = criterion(d_out_real.view(-1), label_real.float()) d_loss_fake = criterion(d_out_fake.view(-1), label_fake.float()) d_loss = d_loss_real + d_loss_fake #反向傳播處理 g_optimizer.zero_grad() d_optimizer.zero_grad() d_loss.backward() d_optimizer.step() # -------------------- # 2.生成器G的學習 # -------------------- #生成偽造圖像并進行判定 input_z = torch.randn(mini_batch_size, z_dim).to(device) input_z = input_z.view(input_z.size(0), input_z.size(1), 1, 1) fake_images = G(input_z) d_out_fake, _ = D(fake_images) #計算誤差 g_loss = criterion(d_out_fake.view(-1), label_real.float()) #反向傳播處理 g_optimizer.zero_grad() d_optimizer.zero_grad() g_loss.backward() g_optimizer.step() # -------------------- # 3. 記錄結果 # -------------------- epoch_d_loss += d_loss.item() epoch_g_loss += g_loss.item() iteration += 1 #epoch的每個phase的loss和準確率 t_epoch_finish = time.time() print('-------------') print('epoch {} || Epoch_D_Loss:{:.4f} ||Epoch_G_Loss:{:.4f}'.format( epoch, epoch_d_loss/batch_size, epoch_g_loss/batch_size)) print('timer: {:.4f} sec.'.format(t_epoch_finish - t_epoch_start)) t_epoch_start = time.time() print("總迭代次數(shù):", iteration) return G, D # 進行訓練和驗證 num_epochs = 300 G_update, D_update = train_model( G, D, dataloader=train_dataloader, num_epochs=num_epochs) # 將生成圖像和訓練數(shù)據(jù)可視化 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 輸入的隨機數(shù)生成 batch_size = 8 z_dim = 20 fixed_z = torch.randn(batch_size, z_dim) fixed_z = fixed_z.view(fixed_z.size(0), fixed_z.size(1), 1, 1) fake_images = G_update(fixed_z.to(device)) # 訓練數(shù)據(jù) batch_iterator = iter(train_dataloader) #轉(zhuǎn)換成迭代器 imges = next(batch_iterator) #提取第1個要素 #圖像的可視化處理 fig = plt.figure(figsize=(15, 6)) for i in range(0, 5): #上層顯示測試圖像 plt.subplot(2, 5, i+1) plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray') #下層顯示生成圖像 plt.subplot(2, 5, 5+i+1) plt.imshow(fake_images[i][0].cpu().detach().numpy(), 'gray') def Anomaly_score(x, fake_img, D, Lambda=0.1): #求測試圖像x和生成圖像fake_img的像素級差的絕對值,并對每個迷你批求和 residual_loss = torch.abs(x-fake_img) residual_loss = residual_loss.view(residual_loss.size()[0], -1) residual_loss = torch.sum(residual_loss, dim=1) # 將測試圖像x和生成圖像fake_img輸入到識別器D,取出特征量 _, x_feature = D(x) _, G_feature = D(fake_img) # 求測試圖像x和生成圖像fake_img的特征量之差的絕對值,對每個迷你批次求和 discrimination_loss = torch.abs(x_feature-G_feature) discrimination_loss = discrimination_loss.view( discrimination_loss.size()[0], -1) discrimination_loss = torch.sum(discrimination_loss, dim=1) # 將兩種損失對每個迷你批進行加法運算 loss_each = (1-Lambda)*residual_loss + Lambda*discrimination_loss #求迷你批的全部損失 total_loss = torch.sum(loss_each) return total_loss, loss_each, residual_loss # 創(chuàng)建測試用的DataLoader def make_test_datapath_list(): """制作用于學習、驗證的圖像數(shù)據(jù)和標注數(shù)據(jù)的文件路徑表。 """ train_img_list = list() # 保存圖像文件路徑 for img_idx in range(5): img_path = "./data/test/img_7_" + str(img_idx)+'.jpg' train_img_list.append(img_path) img_path = "./data/test/img_8_" + str(img_idx)+'.jpg' train_img_list.append(img_path) img_path = "./data/test/img_2_" + str(img_idx)+'.jpg' train_img_list.append(img_path) return train_img_list # 制作文件列表 test_img_list = make_test_datapath_list() # 制作Dataset mean = (0.5,) std = (0.5,) test_dataset = GAN_Img_Dataset( file_list=test_img_list, transform=ImageTransform(mean, std)) # 制作DataLoader batch_size = 5 test_dataloader = torch.utils.data.DataLoader( test_dataset, batch_size=batch_size, shuffle=False) # 測試數(shù)據(jù)的確認 batch_iterator = iter(test_dataloader) # 轉(zhuǎn)換成迭代器 imges = next(batch_iterator) # 取出第一個迷你批次 fig = plt.figure(figsize=(15, 6)) for i in range(0, 5): plt.subplot(2, 5, i+1) plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray') # 想檢測異常的圖像 x = imges[0:5] x = x.to(device) # 用于生成想要異常檢測的圖像的初始隨機數(shù) z = torch.randn(5, 20).to(device) z = z.view(z.size(0), z.size(1), 1, 1) # 変將requires_grad設為True,使得變量z可以求導數(shù) z.requires_grad = True #求z的優(yōu)化函數(shù),以便能夠更新變量z z_optimizer = torch.optim.Adam([z], lr=1e-3) #求z for epoch in range(5000+1): fake_img = G_update(z) loss, _, _ = Anomaly_score(x, fake_img, D_update, Lambda=0.1) z_optimizer.zero_grad() loss.backward() z_optimizer.step() if epoch % 1000 == 0: print('epoch {} || loss_total:{:.0f} '.format(epoch, loss.item())) # 生成圖像 fake_img = G_update(z) # 要求損失 loss, loss_each, residual_loss_each = Anomaly_score( x, fake_img, D_update, Lambda=0.1) #損失的計算總損失 loss_each = loss_each.cpu().detach().numpy() print("total loss:", np.round(loss_each, 0)) # 圖像可視化 fig = plt.figure(figsize=(15, 6)) for i in range(0, 5): # 把測試數(shù)據(jù)放在上層 plt.subplot(2, 5, i+1) plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray') # 在下層顯示生成數(shù)據(jù) plt.subplot(2, 5, 5+i+1) plt.imshow(fake_img[i][0].cpu().detach().numpy(), 'gray')
可以看 2 的損失值最高,由此可判斷 2 為異常圖片。
Efficient GAN
AnoGAN 模型中,最重要的是 z 的取值,對z 的取值也有新的方法,其中一種就是 Efficient GAN,它優(yōu)化了z 值的更新和學習時間。Efficient GAN是通過編碼器的方式來對 z 值進行計算,Encoder通過 BiGAN 機制將圖像于其關聯(lián)在一起。
Efficient GAN 實現(xiàn)
通過 Efficient GAN 實現(xiàn)網(wǎng)絡,并進行訓練和驗證。
# 導入軟件包 import random import math import time import pandas as pd import numpy as np from PIL import Image import torch import torch.utils.data as data import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import transforms # Setup seeds torch.manual_seed(1234) torch.cuda.manual_seed(1234) np.random.seed(1234) random.seed(1234) class Generator(nn.Module): def __init__(self, z_dim=20): super(Generator, self).__init__() self.layer1 = nn.Sequential( nn.Linear(z_dim, 1024), nn.BatchNorm1d(1024), nn.ReLU(inplace=True)) self.layer2 = nn.Sequential( nn.Linear(1024, 7*7*128), nn.BatchNorm1d(7*7*128), nn.ReLU(inplace=True)) self.layer3 = nn.Sequential( nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True)) self.last = nn.Sequential( nn.ConvTranspose2d(in_channels=64, out_channels=1, kernel_size=4, stride=2, padding=1), nn.Tanh()) #注意 :由于是黑白圖像,因此輸出通道數(shù)量為 1 def forward(self, z): out = self.layer1(z) out = self.layer2(out) #為了能置入卷積層中,需要對張量進行變形 out = out.view(z.shape[0], 128, 7, 7) out = self.layer3(out) out = self.last(out) return out #確認執(zhí)行結果 import matplotlib.pyplot as plt %matplotlib inline G = Generator(z_dim=20) G.train() #輸入的隨機數(shù) #由于要進行批次歸一化處理,因此將小批次數(shù)設置為 2 以上 input_z = torch.randn(2, 20) #輸出偽造圖像 fake_images = G(input_z) # torch.Size([2, 1, 28, 28]) img_transformed = fake_images[0][0].detach().numpy() plt.imshow(img_transformed, 'gray') plt.show() class Discriminator(nn.Module): def __init__(self, z_dim=20): super(Discriminator, self).__init__() #圖像這邊的輸入處理 self.x_layer1 = nn.Sequential( nn.Conv2d(1, 64, kernel_size=4, stride=2, padding=1), nn.LeakyReLU(0.1, inplace=True)) #注意 :由于是黑白圖像,因此輸入通道數(shù)量為 1 self.x_layer2 = nn.Sequential( nn.Conv2d(64, 64, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(64), nn.LeakyReLU(0.1, inplace=True)) #隨機數(shù)這邊的輸入處理 self.z_layer1 = nn.Linear(z_dim, 512) #最終的判定 self.last1 = nn.Sequential( nn.Linear(3648, 1024), nn.LeakyReLU(0.1, inplace=True)) self.last2 = nn.Linear(1024, 1) def forward(self, x, z): #圖像這邊的輸入處理 x_out = self.x_layer1(x) x_out = self.x_layer2(x_out) #隨機數(shù)這邊的輸入處理 z = z.view(z.shape[0], -1) z_out = self.z_layer1(z) #將x_out與z_out連接在一起,交給全連接層進行判定 x_out = x_out.view(-1, 64 * 7 * 7) out = torch.cat([x_out, z_out], dim=1) out = self.last1(out) feature = out #最后將通道集中到一個特征量中 feature = feature.view(feature.size()[0], -1) #轉(zhuǎn)換為二維 out = self.last2(out) return out, feature #確認執(zhí)行結果 D = Discriminator(z_dim=20) #生成偽造圖像 input_z = torch.randn(2, 20) fake_images = G(input_z) #將偽造圖像輸入判定器D中 d_out, _ = D(fake_images, input_z) #將輸出結果d_out乘以Sigmoid,以將其轉(zhuǎn)換為0~1的值 print(nn.Sigmoid()(d_out)) class Encoder(nn.Module): def __init__(self, z_dim=20): super(Encoder, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 32, kernel_size=3, stride=1), nn.LeakyReLU(0.1, inplace=True)) #把圖像轉(zhuǎn)換成z self.layer2 = nn.Sequential( nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1), nn.BatchNorm2d(64), nn.LeakyReLU(0.1, inplace=True)) self.layer3 = nn.Sequential( nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1), nn.BatchNorm2d(128), nn.LeakyReLU(0.1, inplace=True)) #到這里為止,圖像的尺寸為7像素×7像素 self.last = nn.Linear(128 * 7 * 7, z_dim) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = self.layer3(out) #為了能放入FC中,對張量進行變形 out = out.view(-1, 128 * 7 * 7) out = self.last(out) return out #確認執(zhí)行結果 E = Encoder(z_dim=20) #輸入的圖像數(shù)據(jù) x = fake_images #fake_images是由上面的生成器G生成的 #將圖像編碼為z z = E(x) print(z.shape) print(z) def make_datapath_list(): """制作用于學習、驗證的圖像數(shù)據(jù)和標注數(shù)據(jù)的文件路徑表。 """ train_img_list = list() # 保存圖像文件路徑 for img_idx in range(200): img_path = "./data/img_78_28size/img_7_" + str(img_idx)+'.jpg' train_img_list.append(img_path) img_path = "./data/img_78_28size/img_8_" + str(img_idx)+'.jpg' train_img_list.append(img_path) return train_img_list class ImageTransform(): """圖像的預處理類""" def __init__(self, mean, std): self.data_transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean, std) ]) def __call__(self, img): return self.data_transform(img) class GAN_Img_Dataset(data.Dataset): """圖像的Dataset類。繼承PyTorch的Dataset類""" def __init__(self, file_list, transform): self.file_list = file_list self.transform = transform def __len__(self): '''返回圖像的張數(shù)''' return len(self.file_list) def __getitem__(self, index): '''獲取預處理圖像的Tensor格式數(shù)據(jù)''' img_path = self.file_list[index] img = Image.open(img_path) # [高][寬]黑白 # 圖像的預處理 img_transformed = self.transform(img) return img_transformed # 創(chuàng)建DataLoader并確認操作 #制作文件列表 train_img_list=make_datapath_list() # Datasetを作成 mean = (0.5,) std = (0.5,) train_dataset = GAN_Img_Dataset( file_list=train_img_list, transform=ImageTransform(mean, std)) # 制作DataLoader batch_size = 64 train_dataloader = torch.utils.data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True) # 動作的確認 batch_iterator = iter(train_dataloader) # 轉(zhuǎn)換成迭代器 imges = next(batch_iterator) # 找出第一個要素 print(imges.size()) # torch.Size([64, 1, 64, 64]) #創(chuàng)建用于訓練模型的函數(shù) def train_model(G, D, E, dataloader, num_epochs): #確認是否可以使用GPU加速 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print("使用設備:", device) #設置最優(yōu)化算法 lr_ge = 0.0001 lr_d = 0.0001/4 beta1, beta2 = 0.5, 0.999 g_optimizer = torch.optim.Adam(G.parameters(), lr_ge, [beta1, beta2]) e_optimizer = torch.optim.Adam(E.parameters(), lr_ge, [beta1, beta2]) d_optimizer = torch.optim.Adam(D.parameters(), lr_d, [beta1, beta2]) #定義誤差函數(shù) #BCEWithLogitsLoss是先將輸入數(shù)據(jù)乘以Logistic, # 再計算二進制交叉熵 criterion = nn.BCEWithLogitsLoss(reduction='mean') #對參數(shù)進行硬編碼 z_dim = 20 mini_batch_size = 64 #將網(wǎng)絡載入GPU中 G.to(device) E.to(device) D.to(device) G.train() #將模型設置為訓練模式 E.train() #將模型設置為訓練模式 D.train() #將模型設置為訓練模式 #如果網(wǎng)絡相對固定,則開啟加速 torch.backends.cudnn.benchmark = True #圖像的張數(shù) num_train_imgs = len(dataloader.dataset) batch_size = dataloader.batch_size #設置迭代計數(shù)器 iteration = 1 logs = [] # epoch循環(huán) for epoch in range(num_epochs): #保存開始時間 t_epoch_start = time.time() epoch_g_loss = 0.0 #epoch的損失總和 epoch_e_loss = 0.0 #epoch的損失總和 epoch_d_loss = 0.0 #epoch的損失總和 print('-------------') print('Epoch {}/{}'.format(epoch, num_epochs)) print('-------------') print('(train)') #以minibatch為單位從數(shù)據(jù)加載器中讀取數(shù)據(jù)的循環(huán) for imges in dataloader: #如果小批次的尺寸設置為1,會導致批次歸一化處理產(chǎn)生錯誤,因此需要避免 if imges.size()[0] == 1: continue #創(chuàng)建用于表示小批次尺寸為1和0的標簽 #創(chuàng)建正確答案標簽和偽造數(shù)據(jù)標簽 #在epoch最后的迭代中,小批次的數(shù)量會減少 mini_batch_size = imges.size()[0] label_real = torch.full((mini_batch_size,), 1).to(device) label_fake = torch.full((mini_batch_size,), 0).to(device) #如果能使用GPU,則將數(shù)據(jù)送入GPU中 imges = imges.to(device) # -------------------- # 1. 判別器D的學習 # -------------------- # 對真實的圖像進行判定 z_out_real = E(imges) d_out_real, _ = D(imges, z_out_real) # 生成偽造圖像并進行判定 input_z = torch.randn(mini_batch_size, z_dim).to(device) fake_images = G(input_z) d_out_fake, _ = D(fake_images, input_z) #計算誤差 d_loss_real = criterion(d_out_real.view(-1), label_real.float()) d_loss_fake = criterion(d_out_fake.view(-1), label_fake.float()) d_loss = d_loss_real + d_loss_fake #反向傳播 d_optimizer.zero_grad() d_loss.backward() d_optimizer.step() # -------------------- # 2. 生成器G的學習 # -------------------- #生成偽造圖像并進行判定 input_z = torch.randn(mini_batch_size, z_dim).to(device) fake_images = G(input_z) d_out_fake, _ = D(fake_images, input_z) #計算誤差 g_loss = criterion(d_out_fake.view(-1), label_real.float()) #反向傳播 g_optimizer.zero_grad() g_loss.backward() g_optimizer.step() # -------------------- # 3. 編碼器E的學習 # -------------------- #對真實圖像的z進行推定 z_out_real = E(imges) d_out_real, _ = D(imges, z_out_real) #計算誤差 e_loss = criterion(d_out_real.view(-1), label_fake.float()) #反向傳播 e_optimizer.zero_grad() e_loss.backward() e_optimizer.step() # -------------------- #4.記錄 # -------------------- epoch_d_loss += d_loss.item() epoch_g_loss += g_loss.item() epoch_e_loss += e_loss.item() iteration += 1 #epoch的每個phase的loss和準確率 t_epoch_finish = time.time() print('-------------') print('epoch {} || Epoch_D_Loss:{:.4f} ||Epoch_G_Loss:{:.4f} ||Epoch_E_Loss:{:.4f}'.format( epoch, epoch_d_loss/batch_size, epoch_g_loss/batch_size, epoch_e_loss/batch_size)) print('timer: {:.4f} sec.'.format(t_epoch_finish - t_epoch_start)) t_epoch_start = time.time() print("總迭代次數(shù):", iteration) return G, D, E #網(wǎng)絡的初始化 def weights_init(m): classname = m.__class__.__name__ if classname.find('Conv') != -1: #Conv2d和ConvTranspose2d的初始化 nn.init.normal_(m.weight.data, 0.0, 0.02) nn.init.constant_(m.bias.data, 0) elif classname.find('BatchNorm') != -1: # BatchNorm2d的初始化 nn.init.normal_(m.weight.data, 0.0, 0.02) nn.init.constant_(m.bias.data, 0) elif classname.find('Linear') != -1: #全連接層Linear的初始化 m.bias.data.fill_(0) #開始初始化 G.apply(weights_init) E.apply(weights_init) D.apply(weights_init) print("網(wǎng)絡已經(jīng)成功地完成了初始化") # 進行訓練和驗證 num_epochs = 1500 G_update, D_update, E_update = train_model( G, D, E, dataloader=train_dataloader, num_epochs=num_epochs) #對生成圖像與訓練數(shù)據(jù)的可視化處理 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #生成輸入的隨機數(shù) batch_size = 8 z_dim = 20 fixed_z = torch.randn(batch_size, z_dim) fake_images = G_update(fixed_z.to(device)) #訓練數(shù)據(jù) batch_iterator = iter(train_dataloader) #轉(zhuǎn)換成迭代器 imges = next(batch_iterator) #取出最開頭的元素 #輸出 fig = plt.figure(figsize=(15, 6)) for i in range(0, 5): #在上層中顯示訓練數(shù)據(jù) plt.subplot(2, 5, i+1) plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray') #在下層中顯示生成數(shù)據(jù) plt.subplot(2, 5, 5+i+1) plt.imshow(fake_images[i][0].cpu().detach().numpy(), 'gray') # ·制作測試用的Dataloader def make_test_datapath_list(): """制作用于學習、驗證的圖像數(shù)據(jù)和標注數(shù)據(jù)的文件路徑表。 """ train_img_list = list() # ·保存圖像文件路徑 for img_idx in range(5): img_path = "./data/test_28size/img_7_" + str(img_idx)+'.jpg' train_img_list.append(img_path) img_path = "./data/test_28size/img_8_" + str(img_idx)+'.jpg' train_img_list.append(img_path) img_path = "./data/test_28size/img_2_" + str(img_idx)+'.jpg' train_img_list.append(img_path) return train_img_list #制作文件列表 test_img_list = make_test_datapath_list() # 創(chuàng)建Dataset mean = (0.5,) std = (0.5,) test_dataset = GAN_Img_Dataset( file_list=test_img_list, transform=ImageTransform(mean, std)) # 制作DataLoader batch_size = 5 test_dataloader = torch.utils.data.DataLoader( test_dataset, batch_size=batch_size, shuffle=False) #訓練數(shù)據(jù) batch_iterator = iter(test_dataloader) #轉(zhuǎn)換成迭代器 imges = next(batch_iterator) #取出最開頭的元素 fig = plt.figure(figsize=(15, 6)) for i in range(0, 5): #在下層中顯示生成數(shù)據(jù) plt.subplot(2, 5, i+1) plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray') def Anomaly_score(x, fake_img, z_out_real, D, Lambda=0.1): #計算測試圖像x與生成圖像fake_img在像素層次上的差值的絕對值,并以小批次為單位進行求和計算 residual_loss = torch.abs(x-fake_img) residual_loss = residual_loss.view(residual_loss.size()[0], -1) residual_loss = torch.sum(residual_loss, dim=1) # 將測試圖像x和生成圖像fake_img輸入判別器D中,并取出特征量圖 _, x_feature = D(x, z_out_real) _, G_feature = D(fake_img, z_out_real) # 計算測試圖像x與生成圖像fake_img的特征量的差的絕對值,并以小批次為單位進行求和計算 discrimination_loss = torch.abs(x_feature-G_feature) discrimination_loss = discrimination_loss.view( discrimination_loss.size()[0], -1) discrimination_loss = torch.sum(discrimination_loss, dim=1) #將每個小批次中的兩種損失相加 loss_each = (1-Lambda)*residual_loss + Lambda*discrimination_loss #對所有批次中的損失進行計算 total_loss = torch.sum(loss_each) return total_loss, loss_each, residual_loss #需要檢測異常的圖像 x = imges[0:5] x = x.to(device) #對監(jiān)督數(shù)據(jù)的圖像進行編碼,轉(zhuǎn)換成z,再用生成器G生成圖像 z_out_real = E_update(imges.to(device)) imges_reconstract = G_update(z_out_real) #計算損失值 loss, loss_each, residual_loss_each = Anomaly_score( x, imges_reconstract, z_out_real, D_update, Lambda=0.1) #計算損失值,損失總和 loss_each = loss_each.cpu().detach().numpy() print("total loss:", np.round(loss_each, 0)) #圖像的可視化 fig = plt.figure(figsize=(15, 6)) for i in range(0, 5): #在上層中顯示訓練數(shù)據(jù) plt.subplot(2, 5, i+1) plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray') #在下層中顯示生成數(shù)據(jù) plt.subplot(2, 5, 5+i+1) plt.imshow(imges_reconstract[i][0].cpu().detach().numpy(), 'gray')
AnoGAN 模型可以進行異常圖片的識別,這個例子比較簡單,由于是單通道訓練,隨意模型訓練比較快。如果是彩色 圖片,訓練改時間會更久,在業(yè)務場景中可以調(diào)整閾值,例如Loss高于 250 為異常圖片。
以上就是Pytorch實現(xiàn)圖片異常檢測功能的詳細內(nèi)容,更多關于Pytorch圖片異常檢測的資料請關注腳本之家其它相關文章!
相關文章
淺析Python 簡單工廠模式和工廠方法模式的優(yōu)缺點
這篇文章主要介紹了Python 工廠模式的相關資料,文中示例代碼非常詳細,幫助大家更好的理解和學習,感興趣的朋友可以了解下2020-07-07Tensorflow訓練MNIST手寫數(shù)字識別模型
這篇文章主要為大家詳細介紹了Tensorflow訓練MNIST手寫數(shù)字識別模型,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2020-02-02django數(shù)據(jù)庫報錯解決匯總:django.db.utils.OperationalError?1045,1049,
這篇文章主要給大家介紹了關于django數(shù)據(jù)庫報錯解決:django.db.utils.OperationalError?1045,1049,2003的相關資料,文中將解決的辦法介紹的非常詳細,需要的朋友可以參考下2023-02-02TensorFlow實現(xiàn)卷積神經(jīng)網(wǎng)絡
這篇文章主要為大家詳細介紹了TensorFlow實現(xiàn)卷積神經(jīng)網(wǎng)絡,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-05-05