使用OpenCV實現(xiàn)道路車輛計數(shù)的使用方法
今天,我們將一起探討如何基于計算機視覺實現(xiàn)道路交通計數(shù)。
在本教程中,我們將僅使用Python和OpenCV,并借助背景減除算法非常簡單地進(jìn)行運動檢測。
我們將從以下四個方面進(jìn)行介紹:
1. 用于物體檢測的背景減法算法主要思想。
2. OpenCV圖像過濾器。
3. 利用輪廓檢測物體。
4. 建立進(jìn)一步數(shù)據(jù)處理的結(jié)構(gòu)。
背景扣除算法
有許多不同的背景扣除算法,但是它們的主要思想都很簡單。
假設(shè)有一個房間的視頻,在某些幀上沒有人和寵物,那么此時的視頻基本為靜態(tài)的,我們將其稱為背景(background_layer)。因此要獲取在視頻上移動的對象,我們只需要:用當(dāng)前幀減去背景即可。
由于光照變化,人為移動物體,或者始終存在移動的人和寵物,我們將無法獲得靜態(tài)幀。在這種情況下,我們從視頻中選出一些圖像幀,如果絕大多數(shù)圖像幀中都具有某個相同的像素點,則此將像素作為background_layer中的一部分。
我們將使用MOG算法進(jìn)行背景扣除
原始幀
代碼如下所示:
import os import logging import logging.handlers import random import numpy as np import skvideo.io import cv2 import matplotlib.pyplot as plt import utils # without this some strange errors happen cv2.ocl.setUseOpenCL(False) random.seed(123) # ============================================================================ IMAGE_DIR = "./out" VIDEO_SOURCE = "input.mp4" SHAPE = (720, 1280) # HxW # ============================================================================ def train_bg_subtractor(inst, cap, num=500): ''' BG substractor need process some amount of frames to start giving result ''' print ('Training BG Subtractor...') i = 0 for frame in cap: inst.apply(frame, None, 0.001) i += 1 if i >= num: return cap def main(): log = logging.getLogger("main") # creting MOG bg subtractor with 500 frames in cache # and shadow detction bg_subtractor = cv2.createBackgroundSubtractorMOG2( history=500, detectShadows=True) # Set up image source # You can use also CV2, for some reason it not working for me cap = skvideo.io.vreader(VIDEO_SOURCE) # skipping 500 frames to train bg subtractor train_bg_subtractor(bg_subtractor, cap, num=500) frame_number = -1 for frame in cap: if not frame.any(): log.error("Frame capture failed, stopping...") break frame_number += 1 utils.save_frame(frame, "./out/frame_%04d.png" % frame_number) fg_mask = bg_subtractor.apply(frame, None, 0.001) utils.save_frame(frame, "./out/fg_mask_%04d.png" % frame_number) # ============================================================================ if __name__ == "__main__": log = utils.init_logging() if not os.path.exists(IMAGE_DIR): log.debug("Creating image directory `%s`...", IMAGE_DIR) os.makedirs(IMAGE_DIR) main()
處理后得到下面的前景圖像
去除背景后的前景圖像
我們可以看出前景圖像上有一些噪音,可以通過標(biāo)準(zhǔn)濾波技術(shù)可以將其消除。
濾波
針對我們現(xiàn)在的情況,我們將需要以下濾波函數(shù):Threshold、Erode、Dilate、Opening、Closing。
首先,我們使用“Closing”來移除區(qū)域中的間隙,然后使用“Opening”來移除個別獨立的像素點,然后使用“Dilate”進(jìn)行擴張以使對象變粗。代碼如下:
def filter_mask(img): kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) # Fill any small holes closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel) # Remove noise opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = cv2.dilate(opening, kernel, iterations=2) # threshold th = dilation[dilation < 240] = 0 return th
處理后的前景如下:
利用輪廓進(jìn)行物體檢測
我們將使用cv2.findContours函數(shù)對輪廓進(jìn)行檢測。我們在使用的時候可以選擇的參數(shù)為:
cv2.CV_RETR_EXTERNAL------僅獲取外部輪廓。
cv2.CV_CHAIN_APPROX_TC89_L1------使用Teh-Chin鏈逼近算法(更快)
代碼如下:
def get_centroid(x, y, w, h): x1 = int(w / 2) y1 = int(h / 2) cx = x + x1 cy = y + y1 return (cx, cy) def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35): matches = [] # finding external contours im, contours, hierarchy = cv2.findContours( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1) # filtering by with, height for (i, contour) in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) contour_valid = (w >= min_contour_width) and ( h >= min_contour_height) if not contour_valid: continue # getting center of the bounding box centroid = get_centroid(x, y, w, h) matches.append(((x, y, w, h), centroid)) return matches
建立數(shù)據(jù)處理框架
我們都知道在ML和CV中,沒有一個算法可以處理所有問題。即使存在這種算法,我們也不會使用它,因為它很難大規(guī)模有效。例如幾年前Netflix公司用300萬美元的獎金懸賞最佳電影推薦算法。有一個團隊完成這個任務(wù),但是他們的推薦算法無法大規(guī)模運行,因此其實對公司毫無用處。但是,Netflix公司仍獎勵了他們100萬美元。
接下來我們來建立解決當(dāng)前問題的框架,這樣可以使數(shù)據(jù)的處理更加方便
class PipelineRunner(object): ''' Very simple pipline. Just run passed processors in order with passing context from one to another. You can also set log level for processors. ''' def __init__(self, pipeline=None, log_level=logging.DEBUG): self.pipeline = pipeline or [] self.context = {} self.log = logging.getLogger(self.__class__.__name__) self.log.setLevel(log_level) self.log_level = log_level self.set_log_level() def set_context(self, data): self.context = data def add(self, processor): if not isinstance(processor, PipelineProcessor): raise Exception( 'Processor should be an isinstance of PipelineProcessor.') processor.log.setLevel(self.log_level) self.pipeline.append(processor) def remove(self, name): for i, p in enumerate(self.pipeline): if p.__class__.__name__ == name: del self.pipeline[i] return True return False def set_log_level(self): for p in self.pipeline: p.log.setLevel(self.log_level) def run(self): for p in self.pipeline: self.context = p(self.context) self.log.debug("Frame #%d processed.", self.context['frame_number']) return self.context class PipelineProcessor(object): ''' Base class for processors. ''' def __init__(self): self.log = logging.getLogger(self.__class__.__name__)
首先我們獲取一張?zhí)幚砥鬟\行順序的列表,讓每個處理器完成一部分工作,在案順序完成執(zhí)行以獲得最終結(jié)果。
我們首先創(chuàng)建輪廓檢測處理器。輪廓檢測處理器只需將前面的背景扣除,濾波和輪廓檢測部分合并在一起即可,代碼如下所示:
class ContourDetection(PipelineProcessor): ''' Detecting moving objects. Purpose of this processor is to subtrac background, get moving objects and detect them with a cv2.findContours method, and then filter off-by width and height. bg_subtractor - background subtractor isinstance. min_contour_width - min bounding rectangle width. min_contour_height - min bounding rectangle height. save_image - if True will save detected objects mask to file. image_dir - where to save images(must exist). ''' def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'): super(ContourDetection, self).__init__() self.bg_subtractor = bg_subtractor self.min_contour_width = min_contour_width self.min_contour_height = min_contour_height self.save_image = save_image self.image_dir = image_dir def filter_mask(self, img, a=None): ''' This filters are hand-picked just based on visual tests ''' kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) # Fill any small holes closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel) # Remove noise opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = cv2.dilate(opening, kernel, iterations=2) return dilation def detect_vehicles(self, fg_mask, context): matches = [] # finding external contours im2, contours, hierarchy = cv2.findContours( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1) for (i, contour) in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) contour_valid = (w >= self.min_contour_width) and ( h >= self.min_contour_height) if not contour_valid: continue centroid = utils.get_centroid(x, y, w, h) matches.append(((x, y, w, h), centroid)) return matches def __call__(self, context): frame = context['frame'].copy() frame_number = context['frame_number'] fg_mask = self.bg_subtractor.apply(frame, None, 0.001) # just thresholding values fg_mask[fg_mask < 240] = 0 fg_mask = self.filter_mask(fg_mask, frame_number) if self.save_image: utils.save_frame(fg_mask, self.image_dir + "/mask_%04d.png" % frame_number, flip=False) context['objects'] = self.detect_vehicles(fg_mask, context) context['fg_mask'] = fg_mask return contex
現(xiàn)在,讓我們創(chuàng)建一個處理器,該處理器將找出不同的幀上檢測到的相同對象,創(chuàng)建路徑,并對到達(dá)出口區(qū)域的車輛進(jìn)行計數(shù)。代碼如下所示:
''' Counting vehicles that entered in exit zone. Purpose of this class based on detected object and local cache create objects pathes and count that entered in exit zone defined by exit masks. exit_masks - list of the exit masks. path_size - max number of points in a path. max_dst - max distance between two points. ''' def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0): super(VehicleCounter, self).__init__() self.exit_masks = exit_masks self.vehicle_count = 0 self.path_size = path_size self.pathes = [] self.max_dst = max_dst self.x_weight = x_weight self.y_weight = y_weight def check_exit(self, point): for exit_mask in self.exit_masks: try: if exit_mask[point[1]][point[0]] == 255: return True except: return True return False def __call__(self, context): objects = context['objects'] context['exit_masks'] = self.exit_masks context['pathes'] = self.pathes context['vehicle_count'] = self.vehicle_count if not objects: return context points = np.array(objects)[:, 0:2] points = points.tolist() # add new points if pathes is empty if not self.pathes: for match in points: self.pathes.append([match]) else: # link new points with old pathes based on minimum distance between # points new_pathes = [] for path in self.pathes: _min = 999999 _match = None for p in points: if len(path) == 1: # distance from last point to current d = utils.distance(p[0], path[-1][0]) else: # based on 2 prev points predict next point and calculate # distance from predicted next point to current xn = 2 * path[-1][0][0] - path[-2][0][0] yn = 2 * path[-1][0][1] - path[-2][0][1] d = utils.distance( p[0], (xn, yn), x_weight=self.x_weight, y_weight=self.y_weight ) if d < _min: _min = d _match = p if _match and _min <= self.max_dst: points.remove(_match) path.append(_match) new_pathes.append(path) # do not drop path if current frame has no matches if _match is None: new_pathes.append(path) self.pathes = new_pathes # add new pathes if len(points): for p in points: # do not add points that already should be counted if self.check_exit(p[1]): continue self.pathes.append([p]) # save only last N points in path for i, _ in enumerate(self.pathes): self.pathes[i] = self.pathes[i][self.path_size * -1:] # count vehicles and drop counted pathes: new_pathes = [] for i, path in enumerate(self.pathes): d = path[-2:] if ( # need at list two points to count len(d) >= 2 and # prev point not in exit zone not self.check_exit(d[0][1]) and # current point in exit zone self.check_exit(d[1][1]) and # path len is bigger then min self.path_size <= len(path) ): self.vehicle_count += 1 else: # prevent linking with path that already in exit zone add = True for p in path: if self.check_exit(p[1]): add = False break if add: new_pathes.append(path) self.pathes = new_pathes context['pathes'] = self.pathes context['objects'] = objects context['vehicle_count'] = self.vehicle_count self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count) return context
上面的代碼有點復(fù)雜,因此讓我們一個部分一個部分的介紹一下。
上面的圖像中綠色的部分是出口區(qū)域。我們在這里對車輛進(jìn)行計數(shù),只有當(dāng)車輛移動的長度超過3個點我們才進(jìn)行計算
我們使用掩碼來解決這個問題,因為它比使用矢量算法有效且簡單得多。只需使用“二進(jìn)制和”即可選出車輛區(qū)域中點。設(shè)置方式如下:
EXIT_PTS = np.array([ [[732, 720], [732, 590], [1280, 500], [1280, 720]], [[0, 400], [645, 400], [645, 0], [0, 0]] ]) base = np.zeros(SHAPE + (3,), dtype='uint8') exit_mask = cv2.fillPoly(base, EXIT_PTS, (255, 255, 255))[:, :, 0]
現(xiàn)在我們將檢測到的點鏈接起來。
對于第一幀圖像,我們將所有點均添加為新路徑。
接下來,如果len(path)== 1,我們在新檢測到的對象中找到與每條路徑最后一點距離最近的對象。
如果len(path)> 1,則使用路徑中的最后兩個點,即在同一條線上預(yù)測新點,并找到該點與當(dāng)前點之間的最小距離。
具有最小距離的點將添加到當(dāng)前路徑的末端并從列表中刪除。如果在此之后還剩下一些點,我們會將其添加為新路徑。這個過程中我們還會限制路徑中的點數(shù)。
new_pathes = [] for path in self.pathes: _min = 999999 _match = None for p in points: if len(path) == 1: # distance from last point to current d = utils.distance(p[0], path[-1][0]) else: # based on 2 prev points predict next point and calculate # distance from predicted next point to current xn = 2 * path[-1][0][0] - path[-2][0][0] yn = 2 * path[-1][0][1] - path[-2][0][1] d = utils.distance( p[0], (xn, yn), x_weight=self.x_weight, y_weight=self.y_weight ) if d < _min: _min = d _match = p if _match and _min <= self.max_dst: points.remove(_match) path.append(_match) new_pathes.append(path) # do not drop path if current frame has no matches if _match is None: new_pathes.append(path) self.pathes = new_pathes # add new pathes if len(points): for p in points: # do not add points that already should be counted if self.check_exit(p[1]): continue self.pathes.append([p]) # save only last N points in path for i, _ in enumerate(self.pathes): self.pathes[i] = self.pathes[i][self.path_size * -1:]
現(xiàn)在,我們將嘗試計算進(jìn)入出口區(qū)域的車輛。為此,我們需獲取路徑中的最后2個點,并檢查len(path)是否應(yīng)大于限制。
# count vehicles and drop counted pathes: new_pathes = [] for i, path in enumerate(self.pathes): d = path[-2:] if ( # need at list two points to count len(d) >= 2 and # prev point not in exit zone not self.check_exit(d[0][1]) and # current point in exit zone self.check_exit(d[1][1]) and # path len is bigger then min self.path_size <= len(path) ): self.vehicle_count += 1 else: # prevent linking with path that already in exit zone add = True for p in path: if self.check_exit(p[1]): add = False break if add: new_pathes.append(path) self.pathes = new_pathes context['pathes'] = self.pathes context['objects'] = objects context['vehicle_count'] = self.vehicle_count self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count) return context
最后兩個處理器是CSV編寫器,用于創(chuàng)建報告CSV文件,以及用于調(diào)試和精美圖片的可視化。
class CsvWriter(PipelineProcessor): def __init__(self, path, name, start_time=0, fps=15): super(CsvWriter, self).__init__() self.fp = open(os.path.join(path, name), 'w') self.writer = csv.DictWriter(self.fp, fieldnames=['time', 'vehicles']) self.writer.writeheader() self.start_time = start_time self.fps = fps self.path = path self.name = name self.prev = None def __call__(self, context): frame_number = context['frame_number'] count = _count = context['vehicle_count'] if self.prev: _count = count - self.prev time = ((self.start_time + int(frame_number / self.fps)) * 100 + int(100.0 / self.fps) * (frame_number % self.fps)) self.writer.writerow({'time': time, 'vehicles': _count}) self.prev = count return context class Visualizer(PipelineProcessor): def __init__(self, save_image=True, image_dir='images'): super(Visualizer, self).__init__() self.save_image = save_image self.image_dir = image_dir def check_exit(self, point, exit_masks=[]): for exit_mask in exit_masks: if exit_mask[point[1]][point[0]] == 255: return True return False def draw_pathes(self, img, pathes): if not img.any(): return for i, path in enumerate(pathes): path = np.array(path)[:, 1].tolist() for point in path: cv2.circle(img, point, 2, CAR_COLOURS[0], -1) cv2.polylines(img, [np.int32(path)], False, CAR_COLOURS[0], 1) return img def draw_boxes(self, img, pathes, exit_masks=[]): for (i, match) in enumerate(pathes): contour, centroid = match[-1][:2] if self.check_exit(centroid, exit_masks): continue x, y, w, h = contour cv2.rectangle(img, (x, y), (x + w - 1, y + h - 1), BOUNDING_BOX_COLOUR, 1) cv2.circle(img, centroid, 2, CENTROID_COLOUR, -1) return img def draw_ui(self, img, vehicle_count, exit_masks=[]): # this just add green mask with opacity to the image for exit_mask in exit_masks: _img = np.zeros(img.shape, img.dtype) _img[:, :] = EXIT_COLOR mask = cv2.bitwise_and(_img, _img, mask=exit_mask) cv2.addWeighted(mask, 1, img, 1, 0, img) # drawing top block with counts cv2.rectangle(img, (0, 0), (img.shape[1], 50), (0, 0, 0), cv2.FILLED) cv2.putText(img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1) return img def __call__(self, context): frame = context['frame'].copy() frame_number = context['frame_number'] pathes = context['pathes'] exit_masks = context['exit_masks'] vehicle_count = context['vehicle_count'] frame = self.draw_ui(frame, vehicle_count, exit_masks) frame = self.draw_pathes(frame, pathes) frame = self.draw_boxes(frame, pathes, exit_masks) utils.save_frame(frame, self.image_dir + "/processed_%04d.png" % frame_number) return context
結(jié)論
正如我們看到的那樣,它并不像許多人想象的那么難。但是,如果小伙伴運行腳本,小伙伴會發(fā)現(xiàn)此解決方案并不理想,存在前景對象存在重疊的問題,并且它也沒有按類型對車輛進(jìn)行分類。但是,當(dāng)相機有較好位置,例如位于道路正上方時,該算法具有很好的準(zhǔn)確性。
到此這篇關(guān)于使用OpenCV實現(xiàn)道路車輛計數(shù)的使用方法的文章就介紹到這了,更多相關(guān)OpenCV 道路車輛計數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python中常見的幾種音頻數(shù)據(jù)讀取、保存方式總結(jié)
Python是一種非常適合進(jìn)行音頻處理和音頻分析的語言,因為它有許多強大的庫可以使用,下面這篇文章主要給大家介紹了關(guān)于python中常見的幾種音頻數(shù)據(jù)讀取、保存方式,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-06-06解決python訓(xùn)練模型報錯:BrokenPipeError:?[Errno?32]?Broken?pipe
這篇文章主要介紹了解決python訓(xùn)練模型報錯:BrokenPipeError:?[Errno?32]?Broken?pipe問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07python設(shè)計并實現(xiàn)平面點類Point的源代碼
這篇文章主要介紹了python-設(shè)計并實現(xiàn)平面點類Point,定義一個平面點類Point,對其重載運算符關(guān)系運算符,關(guān)系運算以距離坐標(biāo)原點的遠(yuǎn)近作為基準(zhǔn),需要的朋友可以參考下2024-05-05flask使用session保存登錄狀態(tài)及攔截未登錄請求代碼
這篇文章主要介紹了flask使用session保存登錄狀態(tài)及攔截未登錄請求代碼,具有一定借鑒價值,需要的朋友可以參考下2018-01-01使用python svm實現(xiàn)直接可用的手寫數(shù)字識別
這篇文章主要介紹了使用python svm實現(xiàn)直接可用的手寫數(shù)字識別,現(xiàn)在網(wǎng)上很多代碼是良莠不齊,真是一言難盡,于是記錄一下,能夠運行成功并識別成功的一個源碼2021-08-08手把手教你在Pycharm中新建虛擬環(huán)境并使用(超詳細(xì)!)
使用python開發(fā)項目通常都會創(chuàng)建一個虛擬環(huán)境,將項目依賴包安裝到虛擬環(huán)境中,避免一臺電腦上開發(fā)多個項目時依賴包版本號不兼容造成沖突,下面這篇文章主要給大家介紹了關(guān)于如何在Pycharm中新建虛擬環(huán)境并使用的相關(guān)資料,需要的朋友可以參考下2022-06-06