Python OpenCV使用dlib進(jìn)行多目標(biāo)跟蹤詳解
在本教程中,您將學(xué)習(xí)如何使用 dlib 庫在實(shí)時(shí)視頻中有效地跟蹤多個(gè)對象。
我們當(dāng)然可以使用 dlib 跟蹤多個(gè)對象;但是,為了獲得可能的最佳性能,我們需要利用多處理并將對象跟蹤器分布在處理器的多個(gè)內(nèi)核上。
正確利用多處理使我們能夠?qū)?dlib 多對象跟蹤每秒幀數(shù) (FPS) 提高 45% 以上!
1.使用 dlib 進(jìn)行多目標(biāo)跟蹤
在本指南的第一部分,我將演示如何實(shí)現(xiàn)一個(gè)簡單、樸素的 dlib 多對象跟蹤腳本。該程序?qū)⒏櫼曨l中的多個(gè)對象;但是,我們會注意到腳本運(yùn)行速度有點(diǎn)慢。 為了提高我們的 FPS,我將向您展示一個(gè)更快、更高效的 dlib 多對象跟蹤器實(shí)現(xiàn)。 最后,我將討論一些改進(jìn)和建議,以增強(qiáng)我們的多對象跟蹤實(shí)現(xiàn)。
2.項(xiàng)目結(jié)構(gòu)
你可以使用tree命令查看我們的項(xiàng)目結(jié)構(gòu):
mobilenet_ssd/ 目錄包含我們的 MobileNet + SSD Caffe 模型文件,它允許我們檢測人(以及其他對象)。 今天我們將回顧兩個(gè) Python 腳本:
- multi_object_tracking_slow.py:dlib 多對象跟蹤的簡單“樸素”方法。
- multi_object_tracking_fast.py:利用多處理的先進(jìn)、快速的方法。
3.dlib 多對象跟蹤的簡單“樸素”方法
我們今天要介紹的第一個(gè) dlib 多對象跟蹤實(shí)現(xiàn)是“樸素的”,因?yàn)樗鼘ⅲ?/p>
1.使用一個(gè)簡單的跟蹤器對象列表。
2.僅使用我們處理器的單個(gè)內(nèi)核按順序更新每個(gè)跟蹤器。
對于某些對象跟蹤任務(wù),此實(shí)現(xiàn)將綽綽有余;然而,為了優(yōu)化我們的 FPS,我們應(yīng)該將對象跟蹤器分布在多個(gè)進(jìn)程中。
我們將從本節(jié)中的簡單實(shí)現(xiàn)開始,然后在下一節(jié)中轉(zhuǎn)到更快的方法。 首先,打開multi_object_tracking_slow.py 腳本并插入以下代碼:
# import the necessary packages from imutils.video import FPS import numpy as np import argparse import imutils import dlib import cv2
讓我們解析我們的命令行參數(shù):
# construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args())
我們的腳本在運(yùn)行時(shí)處理以下命令行參數(shù):
- --prototxt :Caffe 部署 prototxt 文件的路徑。
- --model : prototxt 附帶的模型文件的路徑。
- --video : 輸入視頻文件的路徑。我們將在此視頻中使用 dlib 執(zhí)行多對象跟蹤。
- --output :輸出視頻文件的可選路徑。如果未指定路徑,則不會將視頻輸出到磁盤。我建議輸出到 .avi 或 .mp4 文件。
- --confidence :對象檢測置信度閾值 ,默認(rèn)是0.2 ,該值表示從對象檢測器過濾弱檢測的最小概率。
讓我們定義這個(gè)模型支持的類列表,并從磁盤加載我們的模型:
# initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
我們只關(guān)心今天的賽跑示例中的“人”類,但您可以輕松修改以跟蹤其他類。 我們加載了預(yù)訓(xùn)練的對象檢測器模型。我們將使用我們預(yù)訓(xùn)練的 SSD 來檢測視頻中物體的存在。我們將創(chuàng)建一個(gè) dlib 對象跟蹤器來跟蹤每個(gè)檢測到的對象。
我們還有一些初始化要執(zhí)行:
# initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # initialize the list of object trackers and corresponding class # labels trackers = [] labels = [] # start the frames per second throughput estimator fps = FPS().start()
我們初始化我們的視頻流——我們將從輸入視頻中一次讀取一個(gè)幀。 隨后,我們的視頻writer被初始化為 None 。在即將到來的 while 循環(huán)中,我們將與視頻writer進(jìn)行更多合作。 現(xiàn)在初始化我們的跟蹤器和標(biāo)簽列表。 最后,開始我們的每秒幀數(shù)計(jì)數(shù)器。 我們都準(zhǔn)備好開始處理視頻了:
# loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True)
將幀調(diào)整為600像素寬,保持高寬比。然后,為了dlib兼容性,幀被轉(zhuǎn)換為RGB顏色通道排序(OpenCV的默認(rèn)值是BGR,而dlib的默認(rèn)值是RGB)。
讓我們開始對象檢測階段:
# if there are no object trackers we first need to detect objects # and then create a tracker for each object if len(trackers) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward()
為了執(zhí)行對象跟蹤,我們必須首先執(zhí)行對象檢測
- 手動,通過停止視頻流并手動選擇每個(gè)對象的邊界框。
- 以編程方式,使用經(jīng)過訓(xùn)練的對象檢測器來檢測對象的存在(這就是我們在這里所做的)。
如果沒有對象跟蹤器,那么我們知道我們還沒有執(zhí)行對象檢測。
我們創(chuàng)建并通過 SSD 網(wǎng)絡(luò)傳遞一個(gè) blob 以檢測對象。
接下來,我們繼續(xù)循環(huán)檢測以查找屬于person類的對象,因?yàn)槲覀兊妮斎胍曨l是人類的賽跑:
# loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue
我們開始循環(huán)檢測,其中我們:
- 過濾掉弱檢測。
- 確保每個(gè)檢測都是一個(gè)person。當(dāng)然,您可以刪除這行代碼或根據(jù)您自己的過濾需求對其進(jìn)行自定義。
現(xiàn)在我們已經(jīng)在框架中定位了每個(gè)person,讓我們實(shí)例化我們的跟蹤器并繪制我們的初始邊界框 + 類標(biāo)簽:
# compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # construct a dlib rectangle object from the bounding # box coordinates and start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(startX, startY, endX, endY) t.start_track(rgb, rect) # update our set of trackers and corresponding class # labels labels.append(label) trackers.append(t) # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
要開始跟蹤對象,我們:
- 計(jì)算每個(gè)檢測到的對象的邊界框。
- 實(shí)例化邊界框坐標(biāo)并將其傳遞給跟蹤器。邊界框在這里尤為重要。我們需要為邊界框創(chuàng)建一個(gè) dlib.rectangle 并將其傳遞給 start_track 方法。然后,dlib 可以開始跟蹤對象。
- 最后,我們用單個(gè)跟蹤器填充trackers列表。
因此,在下一個(gè)代碼塊中,我們將處理已經(jīng)建立跟蹤器并且只需要更新位置的情況。 我們在初始檢測步驟中執(zhí)行了兩個(gè)額外的任務(wù):
- 將類標(biāo)簽附加到標(biāo)簽列表。如果您要跟蹤多種類型的對象(例如dog+person),您可能希望知道每個(gè)對象的類型。
- 在對象周圍繪制每個(gè)邊界框矩形和類標(biāo)簽。
如果我們的檢測列表的長度大于0,我們就知道我們處于目標(biāo)跟蹤階段:
# otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of the trackers for (t, l) in zip(trackers, labels): # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # draw the bounding box from the correlation object tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, l, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
在目標(biāo)跟蹤階段,我們遍歷所有trackers和相應(yīng)的labels。然后我們繼續(xù)update每個(gè)對象的位置。為了更新位置,我們只需傳遞 rgb 圖像。
提取邊界框坐標(biāo)后,我們可以為每個(gè)被跟蹤對象繪制一個(gè)邊界框rectangle和label。
幀處理循環(huán)中的其余步驟涉及寫入輸出視頻(如有必要)并顯示結(jié)果:
# check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update()
在這里,我們:
- 如有必要,將frame寫入視頻。
- 顯示輸出幀并捕獲按鍵。如果按下q鍵(退出),我們就會跳出循環(huán)。 最后,我們更新我們的每秒幀數(shù)信息以進(jìn)行基準(zhǔn)測試。
剩下的步驟是在終端打印FPS信息并釋放指針:
# stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
讓我們評估準(zhǔn)確性和性能。打開終端并執(zhí)行以下命令:
$ python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \ --video race.mp4 --output race_output_slow.avi [INFO] loading model... [INFO] starting video stream... [INFO] elapsed time: 24.51 [INFO] approx. FPS: 13.87
看來我們的多目標(biāo)跟蹤器起作用了!
但正如你所看到的,我們只獲得了約13幀/秒。
對于某些應(yīng)用程序來說,這個(gè)FPS可能已經(jīng)足夠了——然而,如果你需要更快的FPS,我建議你看看下面我們更高效的dlib多對象跟蹤器。其次,要明白跟蹤的準(zhǔn)確性并不完美。
4.快速、高效的 dlib 多對象跟蹤實(shí)現(xiàn)
如果您運(yùn)行上一節(jié)中的 dlib 多對象跟蹤腳本并同時(shí)打開系統(tǒng)的監(jiān)視器,您會注意到只使用了處理器的一個(gè)內(nèi)核。
如果您運(yùn)行上一節(jié)中的 dlib 多對象跟蹤腳本并同時(shí)打開系統(tǒng)的活動監(jiān)視器,您會注意到只使用了處理器的一個(gè)內(nèi)核。
利用進(jìn)程使我們的操作系統(tǒng)能夠執(zhí)行更好的進(jìn)程調(diào)度,將進(jìn)程映射到我們機(jī)器上的特定處理器內(nèi)核(大多數(shù)現(xiàn)代操作系統(tǒng)能夠以并行方式有效地調(diào)度使用大量 CPU 的進(jìn)程)。
繼續(xù)打開 mutli_object_tracking_fast.py 并插入以下代碼:
# import the necessary packages from imutils.video import FPS import multiprocessing import numpy as np import argparse import imutils import dlib import cv2
我們將使用 Python Process 類來生成一個(gè)新進(jìn)程——每個(gè)新進(jìn)程都獨(dú)立于原始進(jìn)程。
為了生成這個(gè)進(jìn)程,我們需要提供一個(gè) Python 可以調(diào)用的函數(shù),然后 Python 將使用該函數(shù)并創(chuàng)建一個(gè)全新的進(jìn)程并執(zhí)行它:
def start_tracker(box, label, rgb, inputQueue, outputQueue): # construct a dlib rectangle object from the bounding box # coordinates and then start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(box[0], box[1], box[2], box[3]) t.start_track(rgb, rect)
start_tracker 的前三個(gè)參數(shù)包括:
- box :我們要跟蹤的對象的邊界框坐標(biāo),可能是由某種對象檢測器返回的,無論是手動的還是編程的。
- label :對象的人類可讀標(biāo)簽。
- rgb :我們將用于啟動初始 dlib 對象跟蹤器的 RGB 圖像。
請記住Python多處理是如何工作的——Python將調(diào)用這個(gè)函數(shù),然后創(chuàng)建一個(gè)全新的解釋器來執(zhí)行其中的代碼。因此,每個(gè)生成的start_tracker進(jìn)程都將獨(dú)立于它的父進(jìn)程。為了與Python驅(qū)動程序腳本通信,我們需要利用管道或隊(duì)列(Pipes and Queues)。這兩種類型的對象都是線程/進(jìn)程安全的,使用鎖和信號量來完成。
本質(zhì)上,我們正在創(chuàng)建一個(gè)簡單的生產(chǎn)者/消費(fèi)者關(guān)系:
- 我們的父進(jìn)程將生成新幀并將它們添加到特定對象跟蹤器的隊(duì)列中。
- 然后子進(jìn)程將消耗幀,應(yīng)用對象跟蹤,然后返回更新的邊界框坐標(biāo)。
我決定在這篇文章中使用 Queue 對象;但是,請記住,如果您愿意,也可以使用Pipe
現(xiàn)在讓我們開始一個(gè)無限循環(huán),它將在進(jìn)程中運(yùn)行:
# loop indefinitely -- this function will be called as a daemon # process so we don't need to worry about joining it while True: # attempt to grab the next frame from the input queue rgb = inputQueue.get() # if there was an entry in our queue, process it if rgb is not None: # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # add the label + bounding box coordinates to the output # queue outputQueue.put((label, (startX, startY, endX, endY)))
我們在這里無限循環(huán)——這個(gè)函數(shù)將作為守護(hù)進(jìn)程調(diào)用,所以我們不需要擔(dān)心加入它。
首先,我們將嘗試從 inputQueue 中抓取一個(gè)新幀。如果幀不為空,我們將抓取幀,然后更新對象跟蹤器,讓我們獲得更新后的邊界框坐標(biāo)。
最后,我們將標(biāo)簽和邊界框?qū)懭?outputQueue,以便父進(jìn)程可以在腳本的主循環(huán)中使用它們。
回到父進(jìn)程,我們將解析命令行參數(shù):
# construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args())
此腳本的命令行參數(shù)與我們較慢的非多處理腳本完全相同。
讓我們初始化我們的輸入和輸出隊(duì)列:
# initialize our lists of queues -- both input queue and output queue # for *every* object that we will be tracking inputQueues = [] outputQueues = []
這些隊(duì)列將保存我們正在跟蹤的對象。生成的每個(gè)進(jìn)程都需要兩個(gè) Queue 對象:
- 一個(gè)從其中讀取輸入幀
- 另一個(gè)將結(jié)果寫入
下一個(gè)代碼塊與我們之前的腳本相同:
# initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # start the frames per second throughput estimator fps = FPS().start()
我們定義模型的 CLASSES 并加載模型本身。
現(xiàn)在讓我們開始循環(huán)視頻流中的幀:
# loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True)
現(xiàn)在讓我們處理沒有 inputQueues 的情況:
# if our list of queues is empty then we know we have yet to # create our first object tracker if len(inputQueues) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue
如果沒有 inputQueues,那么我們需要在對象跟蹤之前應(yīng)用對象檢測。 我們應(yīng)用對象檢測,然后繼續(xù)循環(huán)。我們獲取置信度值并過濾掉弱檢測。 如果我們的置信度滿足我們的命令行參數(shù)建立的閾值,我們會考慮檢測,但我們會通過類標(biāo)簽進(jìn)一步過濾掉它。在這種情況下,我們只尋找person對象。 假設(shè)我們找到了一個(gè)person,我們將創(chuàng)建隊(duì)列和生成跟蹤進(jìn)程:
# compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") bb = (startX, startY, endX, endY) # create two brand new input and output queues, # respectively iq = multiprocessing.Queue() oq = multiprocessing.Queue() inputQueues.append(iq) outputQueues.append(oq) # spawn a daemon process for a new object tracker p = multiprocessing.Process( target=start_tracker, args=(bb, label, rgb, iq, oq)) p.daemon = True p.start() # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
我們首先計(jì)算邊界框坐標(biāo)。從那里我們創(chuàng)建兩個(gè)新隊(duì)列 iq 和 oq,分別將它們附加到 inputQueues 和 outputQueues。我們生成一個(gè)新的 start_tracker 進(jìn)程,傳遞邊界框、標(biāo)簽、rgb 圖像和 iq + oq。
我們還繪制了檢測到的對象的邊界框rectangle和類標(biāo)簽label。
否則,我們已經(jīng)執(zhí)行了對象檢測,因此我們需要將每個(gè) dlib 對象跟蹤器應(yīng)用于幀:
# otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of our input ques and add the input RGB # frame to it, enabling us to update each of the respective # object trackers running in separate processes for iq in inputQueues: iq.put(rgb) # loop over each of the output queues for oq in outputQueues: # grab the updated bounding box coordinates for the # object -- the .get method is a blocking operation so # this will pause our execution until the respective # process finishes the tracking update (label, (startX, startY, endX, endY)) = oq.get() # draw the bounding box from the correlation object # tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
遍歷每個(gè) inputQueues ,我們將 rgb 圖像添加到它們。然后我們遍歷每個(gè)outputQueues,從每個(gè)獨(dú)立的對象跟蹤器獲取邊界框坐標(biāo)。最后,我們繪制邊界框+關(guān)聯(lián)的類標(biāo)簽label。
# check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
如有必要,我們將幀寫入輸出視頻,并將幀顯示到屏幕。 如果按下q鍵,我們退出,跳出循環(huán)。 如果我們繼續(xù)處理幀,我們的 FPS 計(jì)算器會更新,然后我們再次在 while 循環(huán)的開頭開始處理。 否則,我們處理完幀,我們顯示 FPS 信息 + 釋放指針并關(guān)閉窗口。
打開終端并執(zhí)行以下命令:
$ python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \ --video race.mp4 --output race_output_fast.avi [INFO] loading model... [INFO] starting video stream... [INFO] elapsed time: 14.01 [INFO] approx. FPS: 24.26
如您所見,我們更快、更高效的多對象跟蹤器以 24 FPS 運(yùn)行,比我們之前的實(shí)現(xiàn)提高了 45% 以上?! 此外,如果您在此腳本運(yùn)行時(shí)打開活動監(jiān)視器,您將看到更多系統(tǒng)的CPU 正在被使用。 這種加速是通過允許每個(gè) dlib 對象跟蹤器在單獨(dú)的進(jìn)程中運(yùn)行來獲得的,這反過來又使您的操作系統(tǒng)能夠執(zhí)行更有效的 CPU 資源調(diào)度。
5.完整代碼
multi_object_tracking_slow.py
# USAGE # python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ # --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 # import the necessary packages from imutils.video import FPS import numpy as np import argparse import imutils import dlib import cv2 # construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") # ap.add_argument("-v", "--video", required=True, # help="path to input video file") ap.add_argument("-v", "--video", help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args()) # initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") # vs = cv2.VideoCapture(args["video"]) vs = cv2.VideoCapture(0) writer = None # initialize the list of object trackers and corresponding class # labels trackers = [] labels = [] # start the frames per second throughput estimator fps = FPS().start() # loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True) # if there are no object trackers we first need to detect objects # and then create a tracker for each object if len(trackers) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue # compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # construct a dlib rectangle object from the bounding # box coordinates and start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(startX, startY, endX, endY) t.start_track(rgb, rect) # update our set of trackers and corresponding class # labels labels.append(label) trackers.append(t) # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of the trackers for (t, l) in zip(trackers, labels): # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # draw the bounding box from the correlation object tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, l, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
multi_object_tracking_fast.py
# USAGE # python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ # --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 # import the necessary packages from imutils.video import FPS import multiprocessing import numpy as np import argparse import imutils import dlib import cv2 def start_tracker(box, label, rgb, inputQueue, outputQueue): # construct a dlib rectangle object from the bounding box # coordinates and then start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(box[0], box[1], box[2], box[3]) t.start_track(rgb, rect) # loop indefinitely -- this function will be called as a daemon # process so we don't need to worry about joining it while True: # attempt to grab the next frame from the input queue rgb = inputQueue.get() # if there was an entry in our queue, process it if rgb is not None: # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # add the label + bounding box coordinates to the output # queue outputQueue.put((label, (startX, startY, endX, endY))) # construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args()) # initialize our list of queues -- both input queue and output queue # for *every* object that we will be tracking inputQueues = [] outputQueues = [] # initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # start the frames per second throughput estimator fps = FPS().start() # loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True) # if our list of queues is empty then we know we have yet to # create our first object tracker if len(inputQueues) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue # compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") bb = (startX, startY, endX, endY) # create two brand new input and output queues, # respectively iq = multiprocessing.Queue() oq = multiprocessing.Queue() inputQueues.append(iq) outputQueues.append(oq) # spawn a daemon process for a new object tracker p = multiprocessing.Process( target=start_tracker, args=(bb, label, rgb, iq, oq)) p.daemon = True p.start() # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of our input ques and add the input RGB # frame to it, enabling us to update each of the respective # object trackers running in separate processes for iq in inputQueues: iq.put(rgb) # loop over each of the output queues for oq in outputQueues: # grab the updated bounding box coordinates for the # object -- the .get method is a blocking operation so # this will pause our execution until the respective # process finishes the tracking update (label, (startX, startY, endX, endY)) = oq.get() # draw the bounding box from the correlation object # tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
鏈接:https://pan.baidu.com/s/1WhJr-Qxh5Wu3TsXKRiTHRg 提取碼:1234
6.改進(jìn)和建議
我今天與大家分享的 dlib 多對象跟蹤 Python 腳本可以很好地處理較短的視頻流;但是,如果您打算將此實(shí)現(xiàn)用于長時(shí)間運(yùn)行的生產(chǎn)環(huán)境(大約數(shù)小時(shí)到數(shù)天的視頻),我建議您進(jìn)行兩項(xiàng)主要改進(jìn):
第一個(gè)改進(jìn)是利用進(jìn)程池,而不是為每個(gè)要跟蹤的對象生成一個(gè)全新的進(jìn)程。今天在這里介紹的實(shí)現(xiàn)為我們需要跟蹤的每個(gè)對象構(gòu)建了一個(gè)全新的隊(duì)列Queue和進(jìn)程Process。
對于今天的目的來說這很好,但考慮一下如果您想跟蹤視頻中的 50 個(gè)對象——這意味著您將生成 50 個(gè)進(jìn)程,每個(gè)對象一個(gè)。那時(shí),系統(tǒng)管理所有這些進(jìn)程的開銷將破壞 FPS 的任何增加。相反,您可能希望利用進(jìn)程池。
如果您的系統(tǒng)有 N 個(gè)處理器內(nèi)核,那么您需要創(chuàng)建一個(gè)包含 N – 1 個(gè)進(jìn)程的池,將一個(gè)內(nèi)核留給您的操作系統(tǒng)來執(zhí)行系統(tǒng)操作。這些進(jìn)程中的每一個(gè)都應(yīng)該執(zhí)行多個(gè)對象跟蹤,維護(hù)一個(gè)對象跟蹤器列表,類似于我們今天介紹的第一個(gè)多對象跟蹤。
這種改進(jìn)將允許您利用處理器的所有內(nèi)核,而無需產(chǎn)生許多獨(dú)立進(jìn)程的開銷。
我要做的第二個(gè)改進(jìn)是清理進(jìn)程和隊(duì)列。如果 dlib 將對象報(bào)告為“丟失”或“消失”,我們不會從 start_tracker 函數(shù)返回,這意味著該進(jìn)程將在父腳本的生命周期內(nèi)存活,并且僅在父腳本退出時(shí)被終止。
同樣,這對于我們今天的目的來說很好,但是如果您打算在生產(chǎn)環(huán)境中使用此代碼,您應(yīng)該:
- 更新 start_tracker 函數(shù)以在 dlib 報(bào)告對象丟失后返回。
- 同時(shí)刪除對應(yīng)進(jìn)程的 inputQueue 和 outputQueue。
未能執(zhí)行此清理將導(dǎo)致長時(shí)間運(yùn)行作業(yè)的不必要的計(jì)算消耗和內(nèi)存開銷。
第三個(gè)改進(jìn)是通過每 N 幀運(yùn)行一次對象檢測器(而不是在開始時(shí)只運(yùn)行一次)來提高跟蹤精度。
實(shí)際上,我在使用 OpenCV 計(jì)數(shù)的文章中演示了這一點(diǎn)。它需要更多的邏輯和思考,但會產(chǎn)生更準(zhǔn)確的跟蹤器。 我選擇放棄這個(gè)腳本的實(shí)現(xiàn),這樣我就可以簡明地教你多處理方法。 理想情況下,除了多處理之外,您還可以使用第三個(gè)改進(jìn)。
以上就是Python OpenCV使用dlib進(jìn)行多目標(biāo)跟蹤詳解的詳細(xì)內(nèi)容,更多關(guān)于OpenCV dlib多目標(biāo)跟蹤的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python實(shí)現(xiàn)輕松讀取大文件的技巧揭秘
Python提供了多種方法來讀取文件內(nèi)容,其中包括read()、readline()和readlines()三個(gè)常用的函數(shù),本文將深入探討這三個(gè)函數(shù)的使用方法,需要的可以參考一下2023-08-08TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程
今天動手開始搭建TensorFlow開發(fā)環(huán)境,所以下面這篇文章主要給大家介紹了關(guān)于TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-11-11python實(shí)現(xiàn)Dijkstra算法的最短路徑問題
這篇文章主要介紹了python實(shí)現(xiàn)Dijkstra算法的最短路徑問題,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete
這篇文章主要介紹了numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02Python?使用?pip?安裝?matplotlib?模塊的方法
matplotlib是python中強(qiáng)大的畫圖模塊,這篇文章主要介紹了Python?使用?pip?安裝?matplotlib?模塊(秒解版),本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02詳解用TensorFlow實(shí)現(xiàn)邏輯回歸算法
本篇文章主要介紹了詳解用TensorFlow實(shí)現(xiàn)邏輯回歸算法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例
這篇文章主要介紹了python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例,需要的朋友可以參考下2014-04-04