欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python OpenCV使用dlib進(jìn)行多目標(biāo)跟蹤詳解

 更新時(shí)間:2022年03月14日 09:46:07   作者:求則得之,舍則失之  
這篇文章主要為大家介紹了如何使用 dlib 庫在實(shí)時(shí)視頻中有效地跟蹤多個(gè)對象,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)OpenCV有一定幫助,需要的可以參考一下

在本教程中,您將學(xué)習(xí)如何使用 dlib 庫在實(shí)時(shí)視頻中有效地跟蹤多個(gè)對象。

我們當(dāng)然可以使用 dlib 跟蹤多個(gè)對象;但是,為了獲得可能的最佳性能,我們需要利用多處理并將對象跟蹤器分布在處理器的多個(gè)內(nèi)核上。

正確利用多處理使我們能夠?qū)?dlib 多對象跟蹤每秒幀數(shù) (FPS) 提高 45% 以上!

1.使用 dlib 進(jìn)行多目標(biāo)跟蹤

在本指南的第一部分,我將演示如何實(shí)現(xiàn)一個(gè)簡單、樸素的 dlib 多對象跟蹤腳本。該程序?qū)⒏櫼曨l中的多個(gè)對象;但是,我們會注意到腳本運(yùn)行速度有點(diǎn)慢。 為了提高我們的 FPS,我將向您展示一個(gè)更快、更高效的 dlib 多對象跟蹤器實(shí)現(xiàn)。 最后,我將討論一些改進(jìn)和建議,以增強(qiáng)我們的多對象跟蹤實(shí)現(xiàn)。

2.項(xiàng)目結(jié)構(gòu)

你可以使用tree命令查看我們的項(xiàng)目結(jié)構(gòu):

mobilenet_ssd/ 目錄包含我們的 MobileNet + SSD Caffe 模型文件,它允許我們檢測人(以及其他對象)。 今天我們將回顧兩個(gè) Python 腳本:

  • multi_object_tracking_slow.py:dlib 多對象跟蹤的簡單“樸素”方法。
  • multi_object_tracking_fast.py:利用多處理的先進(jìn)、快速的方法。

3.dlib 多對象跟蹤的簡單“樸素”方法

我們今天要介紹的第一個(gè) dlib 多對象跟蹤實(shí)現(xiàn)是“樸素的”,因?yàn)樗鼘ⅲ?/p>

1.使用一個(gè)簡單的跟蹤器對象列表。

2.僅使用我們處理器的單個(gè)內(nèi)核按順序更新每個(gè)跟蹤器。

對于某些對象跟蹤任務(wù),此實(shí)現(xiàn)將綽綽有余;然而,為了優(yōu)化我們的 FPS,我們應(yīng)該將對象跟蹤器分布在多個(gè)進(jìn)程中。

我們將從本節(jié)中的簡單實(shí)現(xiàn)開始,然后在下一節(jié)中轉(zhuǎn)到更快的方法。 首先,打開multi_object_tracking_slow.py 腳本并插入以下代碼:

# import the necessary packages
from imutils.video import FPS
import numpy as np
import argparse
import imutils
import dlib
import cv2

讓我們解析我們的命令行參數(shù):

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
    help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
    help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
    help="path to input video file")
ap.add_argument("-o", "--output", type=str,
    help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
    help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

我們的腳本在運(yùn)行時(shí)處理以下命令行參數(shù):

  • --prototxt :Caffe 部署 prototxt 文件的路徑。
  • --model : prototxt 附帶的模型文件的路徑。
  • --video : 輸入視頻文件的路徑。我們將在此視頻中使用 dlib 執(zhí)行多對象跟蹤。
  • --output :輸出視頻文件的可選路徑。如果未指定路徑,則不會將視頻輸出到磁盤。我建議輸出到 .avi 或 .mp4 文件。
  • --confidence :對象檢測置信度閾值 ,默認(rèn)是0.2 ,該值表示從對象檢測器過濾弱檢測的最小概率。

讓我們定義這個(gè)模型支持的類列表,并從磁盤加載我們的模型:

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

我們只關(guān)心今天的賽跑示例中的“人”類,但您可以輕松修改以跟蹤其他類。 我們加載了預(yù)訓(xùn)練的對象檢測器模型。我們將使用我們預(yù)訓(xùn)練的 SSD 來檢測視頻中物體的存在。我們將創(chuàng)建一個(gè) dlib 對象跟蹤器來跟蹤每個(gè)檢測到的對象。

我們還有一些初始化要執(zhí)行:

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None
# initialize the list of object trackers and corresponding class
# labels
trackers = []
labels = []
# start the frames per second throughput estimator
fps = FPS().start()

我們初始化我們的視頻流——我們將從輸入視頻中一次讀取一個(gè)幀。 隨后,我們的視頻writer被初始化為 None 。在即將到來的 while 循環(huán)中,我們將與視頻writer進(jìn)行更多合作。 現(xiàn)在初始化我們的跟蹤器和標(biāo)簽列表。 最后,開始我們的每秒幀數(shù)計(jì)數(shù)器。 我們都準(zhǔn)備好開始處理視頻了:

# loop over frames from the video file stream
while True:
    # grab the next frame from the video file
    (grabbed, frame) = vs.read()
    # check to see if we have reached the end of the video file
    if frame is None:
        break
    # resize the frame for faster processing and then convert the
    # frame from BGR to RGB ordering (dlib needs RGB ordering)
    frame = imutils.resize(frame, width=600)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # if we are supposed to be writing a video to disk, initialize
    # the writer
    if args["output"] is not None and writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 30,
            (frame.shape[1], frame.shape[0]), True)

將幀調(diào)整為600像素寬,保持高寬比。然后,為了dlib兼容性,幀被轉(zhuǎn)換為RGB顏色通道排序(OpenCV的默認(rèn)值是BGR,而dlib的默認(rèn)值是RGB)。

讓我們開始對象檢測階段:

    # if there are no object trackers we first need to detect objects
    # and then create a tracker for each object
    if len(trackers) == 0:
        # grab the frame dimensions and convert the frame to a blob
        (h, w) = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)
        # pass the blob through the network and obtain the detections
        # and predictions
        net.setInput(blob)
        detections = net.forward()

為了執(zhí)行對象跟蹤,我們必須首先執(zhí)行對象檢測

  • 手動,通過停止視頻流并手動選擇每個(gè)對象的邊界框。
  • 以編程方式,使用經(jīng)過訓(xùn)練的對象檢測器來檢測對象的存在(這就是我們在這里所做的)。

如果沒有對象跟蹤器,那么我們知道我們還沒有執(zhí)行對象檢測。

我們創(chuàng)建并通過 SSD 網(wǎng)絡(luò)傳遞一個(gè) blob 以檢測對象。

接下來,我們繼續(xù)循環(huán)檢測以查找屬于person類的對象,因?yàn)槲覀兊妮斎胍曨l是人類的賽跑:

        # loop over the detections
        for i in np.arange(0, detections.shape[2]):
            # extract the confidence (i.e., probability) associated
            # with the prediction
            confidence = detections[0, 0, i, 2]
            # filter out weak detections by requiring a minimum
            # confidence
            if confidence > args["confidence"]:
                # extract the index of the class label from the
                # detections list
                idx = int(detections[0, 0, i, 1])
                label = CLASSES[idx]
                # if the class label is not a person, ignore it
                if CLASSES[idx] != "person":
                    continue

我們開始循環(huán)檢測,其中我們:

  • 過濾掉弱檢測。
  • 確保每個(gè)檢測都是一個(gè)person。當(dāng)然,您可以刪除這行代碼或根據(jù)您自己的過濾需求對其進(jìn)行自定義。

現(xiàn)在我們已經(jīng)在框架中定位了每個(gè)person,讓我們實(shí)例化我們的跟蹤器并繪制我們的初始邊界框 + 類標(biāo)簽:

                # compute the (x, y)-coordinates of the bounding box
                # for the object
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                # construct a dlib rectangle object from the bounding
                # box coordinates and start the correlation tracker
                t = dlib.correlation_tracker()
                rect = dlib.rectangle(startX, startY, endX, endY)
                t.start_track(rgb, rect)
                # update our set of trackers and corresponding class
                # labels
                labels.append(label)
                trackers.append(t)
                # grab the corresponding class label for the detection
                # and draw the bounding box
                cv2.rectangle(frame, (startX, startY), (endX, endY),
                    (0, 255, 0), 2)
                cv2.putText(frame, label, (startX, startY - 15),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

要開始跟蹤對象,我們:

  • 計(jì)算每個(gè)檢測到的對象的邊界框。
  • 實(shí)例化邊界框坐標(biāo)并將其傳遞給跟蹤器。邊界框在這里尤為重要。我們需要為邊界框創(chuàng)建一個(gè) dlib.rectangle 并將其傳遞給 start_track 方法。然后,dlib 可以開始跟蹤對象。
  • 最后,我們用單個(gè)跟蹤器填充trackers列表。

因此,在下一個(gè)代碼塊中,我們將處理已經(jīng)建立跟蹤器并且只需要更新位置的情況。 我們在初始檢測步驟中執(zhí)行了兩個(gè)額外的任務(wù):

  • 將類標(biāo)簽附加到標(biāo)簽列表。如果您要跟蹤多種類型的對象(例如dog+person),您可能希望知道每個(gè)對象的類型。
  • 在對象周圍繪制每個(gè)邊界框矩形和類標(biāo)簽。

如果我們的檢測列表的長度大于0,我們就知道我們處于目標(biāo)跟蹤階段:

    # otherwise, we've already performed detection so let's track
    # multiple objects
    else:
        # loop over each of the trackers
        for (t, l) in zip(trackers, labels):
            # update the tracker and grab the position of the tracked
            # object
            t.update(rgb)
            pos = t.get_position()
            # unpack the position object
            startX = int(pos.left())
            startY = int(pos.top())
            endX = int(pos.right())
            endY = int(pos.bottom())
            # draw the bounding box from the correlation object tracker
            cv2.rectangle(frame, (startX, startY), (endX, endY),
                (0, 255, 0), 2)
            cv2.putText(frame, l, (startX, startY - 15),
                cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

在目標(biāo)跟蹤階段,我們遍歷所有trackers和相應(yīng)的labels。然后我們繼續(xù)update每個(gè)對象的位置。為了更新位置,我們只需傳遞 rgb 圖像。

提取邊界框坐標(biāo)后,我們可以為每個(gè)被跟蹤對象繪制一個(gè)邊界框rectangle和label。

幀處理循環(huán)中的其余步驟涉及寫入輸出視頻(如有必要)并顯示結(jié)果:

    # check to see if we should write the frame to disk
    if writer is not None:
        writer.write(frame)
    # show the output frame
    cv2.imshow("Frame", frame)
    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    # update the FPS counter
    fps.update()

在這里,我們:

  • 如有必要,將frame寫入視頻。
  • 顯示輸出幀并捕獲按鍵。如果按下q鍵(退出),我們就會跳出循環(huán)。 最后,我們更新我們的每秒幀數(shù)信息以進(jìn)行基準(zhǔn)測試。

剩下的步驟是在終端打印FPS信息并釋放指針:

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# check to see if we need to release the video writer pointer
if writer is not None:
    writer.release()
# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

讓我們評估準(zhǔn)確性和性能。打開終端并執(zhí)行以下命令:

$ python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \
    --video race.mp4 --output race_output_slow.avi
[INFO] loading model...
[INFO] starting video stream...
[INFO] elapsed time: 24.51
[INFO] approx. FPS: 13.87

看來我們的多目標(biāo)跟蹤器起作用了!

但正如你所看到的,我們只獲得了約13幀/秒。

對于某些應(yīng)用程序來說,這個(gè)FPS可能已經(jīng)足夠了——然而,如果你需要更快的FPS,我建議你看看下面我們更高效的dlib多對象跟蹤器。其次,要明白跟蹤的準(zhǔn)確性并不完美。

4.快速、高效的 dlib 多對象跟蹤實(shí)現(xiàn)

如果您運(yùn)行上一節(jié)中的 dlib 多對象跟蹤腳本并同時(shí)打開系統(tǒng)的監(jiān)視器,您會注意到只使用了處理器的一個(gè)內(nèi)核。

如果您運(yùn)行上一節(jié)中的 dlib 多對象跟蹤腳本并同時(shí)打開系統(tǒng)的活動監(jiān)視器,您會注意到只使用了處理器的一個(gè)內(nèi)核。

利用進(jìn)程使我們的操作系統(tǒng)能夠執(zhí)行更好的進(jìn)程調(diào)度,將進(jìn)程映射到我們機(jī)器上的特定處理器內(nèi)核(大多數(shù)現(xiàn)代操作系統(tǒng)能夠以并行方式有效地調(diào)度使用大量 CPU 的進(jìn)程)。

繼續(xù)打開 mutli_object_tracking_fast.py 并插入以下代碼:

# import the necessary packages
from imutils.video import FPS
import multiprocessing
import numpy as np
import argparse
import imutils
import dlib
import cv2

我們將使用 Python Process 類來生成一個(gè)新進(jìn)程——每個(gè)新進(jìn)程都獨(dú)立于原始進(jìn)程。

為了生成這個(gè)進(jìn)程,我們需要提供一個(gè) Python 可以調(diào)用的函數(shù),然后 Python 將使用該函數(shù)并創(chuàng)建一個(gè)全新的進(jìn)程并執(zhí)行它:

def start_tracker(box, label, rgb, inputQueue, outputQueue):
    # construct a dlib rectangle object from the bounding box
    # coordinates and then start the correlation tracker
    t = dlib.correlation_tracker()
    rect = dlib.rectangle(box[0], box[1], box[2], box[3])
    t.start_track(rgb, rect)

start_tracker 的前三個(gè)參數(shù)包括:

  • box :我們要跟蹤的對象的邊界框坐標(biāo),可能是由某種對象檢測器返回的,無論是手動的還是編程的。
  • label :對象的人類可讀標(biāo)簽。
  • rgb :我們將用于啟動初始 dlib 對象跟蹤器的 RGB 圖像。

請記住Python多處理是如何工作的——Python將調(diào)用這個(gè)函數(shù),然后創(chuàng)建一個(gè)全新的解釋器來執(zhí)行其中的代碼。因此,每個(gè)生成的start_tracker進(jìn)程都將獨(dú)立于它的父進(jìn)程。為了與Python驅(qū)動程序腳本通信,我們需要利用管道或隊(duì)列(Pipes and Queues)。這兩種類型的對象都是線程/進(jìn)程安全的,使用鎖和信號量來完成。

本質(zhì)上,我們正在創(chuàng)建一個(gè)簡單的生產(chǎn)者/消費(fèi)者關(guān)系:

  • 我們的父進(jìn)程將生成新幀并將它們添加到特定對象跟蹤器的隊(duì)列中。
  • 然后子進(jìn)程將消耗幀,應(yīng)用對象跟蹤,然后返回更新的邊界框坐標(biāo)。

我決定在這篇文章中使用 Queue 對象;但是,請記住,如果您愿意,也可以使用Pipe

現(xiàn)在讓我們開始一個(gè)無限循環(huán),它將在進(jìn)程中運(yùn)行:

    # loop indefinitely -- this function will be called as a daemon
    # process so we don't need to worry about joining it
    while True:
        # attempt to grab the next frame from the input queue
        rgb = inputQueue.get()
        # if there was an entry in our queue, process it
        if rgb is not None:
            # update the tracker and grab the position of the tracked
            # object
            t.update(rgb)
            pos = t.get_position()
            # unpack the position object
            startX = int(pos.left())
            startY = int(pos.top())
            endX = int(pos.right())
            endY = int(pos.bottom())
            # add the label + bounding box coordinates to the output
            # queue
            outputQueue.put((label, (startX, startY, endX, endY)))

我們在這里無限循環(huán)——這個(gè)函數(shù)將作為守護(hù)進(jìn)程調(diào)用,所以我們不需要擔(dān)心加入它。

首先,我們將嘗試從 inputQueue 中抓取一個(gè)新幀。如果幀不為空,我們將抓取幀,然后更新對象跟蹤器,讓我們獲得更新后的邊界框坐標(biāo)。

最后,我們將標(biāo)簽和邊界框?qū)懭?outputQueue,以便父進(jìn)程可以在腳本的主循環(huán)中使用它們。

回到父進(jìn)程,我們將解析命令行參數(shù):

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
    help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
    help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
    help="path to input video file")
ap.add_argument("-o", "--output", type=str,
    help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
    help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

此腳本的命令行參數(shù)與我們較慢的非多處理腳本完全相同。

讓我們初始化我們的輸入和輸出隊(duì)列:

# initialize our lists of queues -- both input queue and output queue
# for *every* object that we will be tracking
inputQueues = []
outputQueues = []

這些隊(duì)列將保存我們正在跟蹤的對象。生成的每個(gè)進(jìn)程都需要兩個(gè) Queue 對象:

  • 一個(gè)從其中讀取輸入幀
  • 另一個(gè)將結(jié)果寫入

下一個(gè)代碼塊與我們之前的腳本相同:

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None
# start the frames per second throughput estimator
fps = FPS().start()

我們定義模型的 CLASSES 并加載模型本身。

現(xiàn)在讓我們開始循環(huán)視頻流中的幀:

# loop over frames from the video file stream
while True:
    # grab the next frame from the video file
    (grabbed, frame) = vs.read()
    # check to see if we have reached the end of the video file
    if frame is None:
        break
    # resize the frame for faster processing and then convert the
    # frame from BGR to RGB ordering (dlib needs RGB ordering)
    frame = imutils.resize(frame, width=600)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # if we are supposed to be writing a video to disk, initialize
    # the writer
    if args["output"] is not None and writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 30,
            (frame.shape[1], frame.shape[0]), True)

現(xiàn)在讓我們處理沒有 inputQueues 的情況:

    # if our list of queues is empty then we know we have yet to
    # create our first object tracker
    if len(inputQueues) == 0:
        # grab the frame dimensions and convert the frame to a blob
        (h, w) = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)
        # pass the blob through the network and obtain the detections
        # and predictions
        net.setInput(blob)
        detections = net.forward()
        # loop over the detections
        for i in np.arange(0, detections.shape[2]):
            # extract the confidence (i.e., probability) associated
            # with the prediction
            confidence = detections[0, 0, i, 2]
            # filter out weak detections by requiring a minimum
            # confidence
            if confidence > args["confidence"]:
                # extract the index of the class label from the
                # detections list
                idx = int(detections[0, 0, i, 1])
                label = CLASSES[idx]
                # if the class label is not a person, ignore it
                if CLASSES[idx] != "person":
                    continue

如果沒有 inputQueues,那么我們需要在對象跟蹤之前應(yīng)用對象檢測。 我們應(yīng)用對象檢測,然后繼續(xù)循環(huán)。我們獲取置信度值并過濾掉弱檢測。 如果我們的置信度滿足我們的命令行參數(shù)建立的閾值,我們會考慮檢測,但我們會通過類標(biāo)簽進(jìn)一步過濾掉它。在這種情況下,我們只尋找person對象。 假設(shè)我們找到了一個(gè)person,我們將創(chuàng)建隊(duì)列和生成跟蹤進(jìn)程:

                # compute the (x, y)-coordinates of the bounding box
                # for the object
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                bb = (startX, startY, endX, endY)
                # create two brand new input and output queues,
                # respectively
                iq = multiprocessing.Queue()
                oq = multiprocessing.Queue()
                inputQueues.append(iq)
                outputQueues.append(oq)
                # spawn a daemon process for a new object tracker
                p = multiprocessing.Process(
                    target=start_tracker,
                    args=(bb, label, rgb, iq, oq))
                p.daemon = True
                p.start()
                # grab the corresponding class label for the detection
                # and draw the bounding box
                cv2.rectangle(frame, (startX, startY), (endX, endY),
                    (0, 255, 0), 2)
                cv2.putText(frame, label, (startX, startY - 15),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

我們首先計(jì)算邊界框坐標(biāo)。從那里我們創(chuàng)建兩個(gè)新隊(duì)列 iq 和 oq,分別將它們附加到 inputQueues 和 outputQueues。我們生成一個(gè)新的 start_tracker 進(jìn)程,傳遞邊界框、標(biāo)簽、rgb 圖像和 iq + oq。

我們還繪制了檢測到的對象的邊界框rectangle和類標(biāo)簽label。

否則,我們已經(jīng)執(zhí)行了對象檢測,因此我們需要將每個(gè) dlib 對象跟蹤器應(yīng)用于幀:

    # otherwise, we've already performed detection so let's track
    # multiple objects
    else:
        # loop over each of our input ques and add the input RGB
        # frame to it, enabling us to update each of the respective
        # object trackers running in separate processes
        for iq in inputQueues:
            iq.put(rgb)
        # loop over each of the output queues
        for oq in outputQueues:
            # grab the updated bounding box coordinates for the
            # object -- the .get method is a blocking operation so
            # this will pause our execution until the respective
            # process finishes the tracking update
            (label, (startX, startY, endX, endY)) = oq.get()
            # draw the bounding box from the correlation object
            # tracker
            cv2.rectangle(frame, (startX, startY), (endX, endY),
                (0, 255, 0), 2)
            cv2.putText(frame, label, (startX, startY - 15),
                cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

遍歷每個(gè) inputQueues ,我們將 rgb 圖像添加到它們。然后我們遍歷每個(gè)outputQueues,從每個(gè)獨(dú)立的對象跟蹤器獲取邊界框坐標(biāo)。最后,我們繪制邊界框+關(guān)聯(lián)的類標(biāo)簽label。

    # check to see if we should write the frame to disk
    if writer is not None:
        writer.write(frame)
    # show the output frame
    cv2.imshow("Frame", frame)
    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    # update the FPS counter
    fps.update()
# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# check to see if we need to release the video writer pointer
if writer is not None:
    writer.release()
# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

如有必要,我們將幀寫入輸出視頻,并將幀顯示到屏幕。 如果按下q鍵,我們退出,跳出循環(huán)。 如果我們繼續(xù)處理幀,我們的 FPS 計(jì)算器會更新,然后我們再次在 while 循環(huán)的開頭開始處理。 否則,我們處理完幀,我們顯示 FPS 信息 + 釋放指針并關(guān)閉窗口。

打開終端并執(zhí)行以下命令:

$ python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \
    --video race.mp4 --output race_output_fast.avi
[INFO] loading model...
[INFO] starting video stream...
[INFO] elapsed time: 14.01
[INFO] approx. FPS: 24.26

如您所見,我們更快、更高效的多對象跟蹤器以 24 FPS 運(yùn)行,比我們之前的實(shí)現(xiàn)提高了 45% 以上?! 此外,如果您在此腳本運(yùn)行時(shí)打開活動監(jiān)視器,您將看到更多系統(tǒng)的CPU 正在被使用。 這種加速是通過允許每個(gè) dlib 對象跟蹤器在單獨(dú)的進(jìn)程中運(yùn)行來獲得的,這反過來又使您的操作系統(tǒng)能夠執(zhí)行更有效的 CPU 資源調(diào)度。

5.完整代碼

multi_object_tracking_slow.py

# USAGE
# python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
# 	--model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4

# import the necessary packages
from imutils.video import FPS
import numpy as np
import argparse
import imutils
import dlib
import cv2

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
	help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
	help="path to Caffe pre-trained model")
# ap.add_argument("-v", "--video", required=True,
# 	help="path to input video file")
ap.add_argument("-v", "--video",
	help="path to input video file")
ap.add_argument("-o", "--output", type=str,
	help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
	help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
	"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
	"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
	"sofa", "train", "tvmonitor"]

# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
# vs = cv2.VideoCapture(args["video"])
vs = cv2.VideoCapture(0)
writer = None

# initialize the list of object trackers and corresponding class
# labels
trackers = []
labels = []

# start the frames per second throughput estimator
fps = FPS().start()

# loop over frames from the video file stream
while True:
	# grab the next frame from the video file
	(grabbed, frame) = vs.read()

	# check to see if we have reached the end of the video file
	if frame is None:
		break

	# resize the frame for faster processing and then convert the
	# frame from BGR to RGB ordering (dlib needs RGB ordering)
	frame = imutils.resize(frame, width=600)
	rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

	# if we are supposed to be writing a video to disk, initialize
	# the writer
	if args["output"] is not None and writer is None:
		fourcc = cv2.VideoWriter_fourcc(*"MJPG")
		writer = cv2.VideoWriter(args["output"], fourcc, 30,
			(frame.shape[1], frame.shape[0]), True)

	# if there are no object trackers we first need to detect objects
	# and then create a tracker for each object
	if len(trackers) == 0:
		# grab the frame dimensions and convert the frame to a blob
		(h, w) = frame.shape[:2]
		blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)

		# pass the blob through the network and obtain the detections
		# and predictions
		net.setInput(blob)
		detections = net.forward()

		# loop over the detections
		for i in np.arange(0, detections.shape[2]):
			# extract the confidence (i.e., probability) associated
			# with the prediction
			confidence = detections[0, 0, i, 2]

			# filter out weak detections by requiring a minimum
			# confidence
			if confidence > args["confidence"]:
				# extract the index of the class label from the
				# detections list
				idx = int(detections[0, 0, i, 1])
				label = CLASSES[idx]

				# if the class label is not a person, ignore it
				if CLASSES[idx] != "person":
					continue

				# compute the (x, y)-coordinates of the bounding box
				# for the object
				box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
				(startX, startY, endX, endY) = box.astype("int")

				# construct a dlib rectangle object from the bounding
				# box coordinates and start the correlation tracker
				t = dlib.correlation_tracker()
				rect = dlib.rectangle(startX, startY, endX, endY)
				t.start_track(rgb, rect)

				# update our set of trackers and corresponding class
				# labels
				labels.append(label)
				trackers.append(t)

				# grab the corresponding class label for the detection
				# and draw the bounding box
				cv2.rectangle(frame, (startX, startY), (endX, endY),
					(0, 255, 0), 2)
				cv2.putText(frame, label, (startX, startY - 15),
					cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# otherwise, we've already performed detection so let's track
	# multiple objects
	else:
		# loop over each of the trackers
		for (t, l) in zip(trackers, labels):
			# update the tracker and grab the position of the tracked
			# object
			t.update(rgb)
			pos = t.get_position()

			# unpack the position object
			startX = int(pos.left())
			startY = int(pos.top())
			endX = int(pos.right())
			endY = int(pos.bottom())

			# draw the bounding box from the correlation object tracker
			cv2.rectangle(frame, (startX, startY), (endX, endY),
				(0, 255, 0), 2)
			cv2.putText(frame, l, (startX, startY - 15),
				cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# check to see if we should write the frame to disk
	if writer is not None:
		writer.write(frame)

	# show the output frame
	cv2.imshow("Frame", frame)
	key = cv2.waitKey(1) & 0xFF

	# if the `q` key was pressed, break from the loop
	if key == ord("q"):
		break

	# update the FPS counter
	fps.update()

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))

# check to see if we need to release the video writer pointer
if writer is not None:
	writer.release()

# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

multi_object_tracking_fast.py

# USAGE
# python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
#	--model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4

# import the necessary packages
from imutils.video import FPS
import multiprocessing
import numpy as np
import argparse
import imutils
import dlib
import cv2

def start_tracker(box, label, rgb, inputQueue, outputQueue):
	# construct a dlib rectangle object from the bounding box
	# coordinates and then start the correlation tracker
	t = dlib.correlation_tracker()
	rect = dlib.rectangle(box[0], box[1], box[2], box[3])
	t.start_track(rgb, rect)

	# loop indefinitely -- this function will be called as a daemon
	# process so we don't need to worry about joining it
	while True:
		# attempt to grab the next frame from the input queue
		rgb = inputQueue.get()

		# if there was an entry in our queue, process it
		if rgb is not None:
			# update the tracker and grab the position of the tracked
			# object
			t.update(rgb)
			pos = t.get_position()

			# unpack the position object
			startX = int(pos.left())
			startY = int(pos.top())
			endX = int(pos.right())
			endY = int(pos.bottom())

			# add the label + bounding box coordinates to the output
			# queue
			outputQueue.put((label, (startX, startY, endX, endY)))

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
	help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
	help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
	help="path to input video file")
ap.add_argument("-o", "--output", type=str,
	help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
	help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

# initialize our list of queues -- both input queue and output queue
# for *every* object that we will be tracking
inputQueues = []
outputQueues = []

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
	"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
	"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
	"sofa", "train", "tvmonitor"]

# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None

# start the frames per second throughput estimator
fps = FPS().start()

# loop over frames from the video file stream
while True:
	# grab the next frame from the video file
	(grabbed, frame) = vs.read()

	# check to see if we have reached the end of the video file
	if frame is None:
		break

	# resize the frame for faster processing and then convert the
	# frame from BGR to RGB ordering (dlib needs RGB ordering)
	frame = imutils.resize(frame, width=600)
	rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

	# if we are supposed to be writing a video to disk, initialize
	# the writer
	if args["output"] is not None and writer is None:
		fourcc = cv2.VideoWriter_fourcc(*"MJPG")
		writer = cv2.VideoWriter(args["output"], fourcc, 30,
			(frame.shape[1], frame.shape[0]), True)

	# if our list of queues is empty then we know we have yet to
	# create our first object tracker
	if len(inputQueues) == 0:
		# grab the frame dimensions and convert the frame to a blob
		(h, w) = frame.shape[:2]
		blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)

		# pass the blob through the network and obtain the detections
		# and predictions
		net.setInput(blob)
		detections = net.forward()

		# loop over the detections
		for i in np.arange(0, detections.shape[2]):
			# extract the confidence (i.e., probability) associated
			# with the prediction
			confidence = detections[0, 0, i, 2]

			# filter out weak detections by requiring a minimum
			# confidence
			if confidence > args["confidence"]:
				# extract the index of the class label from the
				# detections list
				idx = int(detections[0, 0, i, 1])
				label = CLASSES[idx]

				# if the class label is not a person, ignore it
				if CLASSES[idx] != "person":
					continue

				# compute the (x, y)-coordinates of the bounding box
				# for the object
				box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
				(startX, startY, endX, endY) = box.astype("int")
				bb = (startX, startY, endX, endY)

				# create two brand new input and output queues,
				# respectively
				iq = multiprocessing.Queue()
				oq = multiprocessing.Queue()
				inputQueues.append(iq)
				outputQueues.append(oq)

				# spawn a daemon process for a new object tracker
				p = multiprocessing.Process(
					target=start_tracker,
					args=(bb, label, rgb, iq, oq))
				p.daemon = True
				p.start()

				# grab the corresponding class label for the detection
				# and draw the bounding box
				cv2.rectangle(frame, (startX, startY), (endX, endY),
					(0, 255, 0), 2)
				cv2.putText(frame, label, (startX, startY - 15),
					cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# otherwise, we've already performed detection so let's track
	# multiple objects
	else:
		# loop over each of our input ques and add the input RGB
		# frame to it, enabling us to update each of the respective
		# object trackers running in separate processes
		for iq in inputQueues:
			iq.put(rgb)

		# loop over each of the output queues
		for oq in outputQueues:
			# grab the updated bounding box coordinates for the
			# object -- the .get method is a blocking operation so
			# this will pause our execution until the respective
			# process finishes the tracking update
			(label, (startX, startY, endX, endY)) = oq.get()

			# draw the bounding box from the correlation object
			# tracker
			cv2.rectangle(frame, (startX, startY), (endX, endY),
				(0, 255, 0), 2)
			cv2.putText(frame, label, (startX, startY - 15),
				cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# check to see if we should write the frame to disk
	if writer is not None:
		writer.write(frame)

	# show the output frame
	cv2.imshow("Frame", frame)
	key = cv2.waitKey(1) & 0xFF

	# if the `q` key was pressed, break from the loop
	if key == ord("q"):
		break

	# update the FPS counter
	fps.update()

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))

# check to see if we need to release the video writer pointer
if writer is not None:
	writer.release()

# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

鏈接:https://pan.baidu.com/s/1WhJr-Qxh5Wu3TsXKRiTHRg 提取碼:1234

6.改進(jìn)和建議

我今天與大家分享的 dlib 多對象跟蹤 Python 腳本可以很好地處理較短的視頻流;但是,如果您打算將此實(shí)現(xiàn)用于長時(shí)間運(yùn)行的生產(chǎn)環(huán)境(大約數(shù)小時(shí)到數(shù)天的視頻),我建議您進(jìn)行兩項(xiàng)主要改進(jìn):

第一個(gè)改進(jìn)是利用進(jìn)程池,而不是為每個(gè)要跟蹤的對象生成一個(gè)全新的進(jìn)程。今天在這里介紹的實(shí)現(xiàn)為我們需要跟蹤的每個(gè)對象構(gòu)建了一個(gè)全新的隊(duì)列Queue和進(jìn)程Process。

對于今天的目的來說這很好,但考慮一下如果您想跟蹤視頻中的 50 個(gè)對象——這意味著您將生成 50 個(gè)進(jìn)程,每個(gè)對象一個(gè)。那時(shí),系統(tǒng)管理所有這些進(jìn)程的開銷將破壞 FPS 的任何增加。相反,您可能希望利用進(jìn)程池。

如果您的系統(tǒng)有 N 個(gè)處理器內(nèi)核,那么您需要創(chuàng)建一個(gè)包含 N – 1 個(gè)進(jìn)程的池,將一個(gè)內(nèi)核留給您的操作系統(tǒng)來執(zhí)行系統(tǒng)操作。這些進(jìn)程中的每一個(gè)都應(yīng)該執(zhí)行多個(gè)對象跟蹤,維護(hù)一個(gè)對象跟蹤器列表,類似于我們今天介紹的第一個(gè)多對象跟蹤。

這種改進(jìn)將允許您利用處理器的所有內(nèi)核,而無需產(chǎn)生許多獨(dú)立進(jìn)程的開銷。

我要做的第二個(gè)改進(jìn)是清理進(jìn)程和隊(duì)列。如果 dlib 將對象報(bào)告為“丟失”或“消失”,我們不會從 start_tracker 函數(shù)返回,這意味著該進(jìn)程將在父腳本的生命周期內(nèi)存活,并且僅在父腳本退出時(shí)被終止。

同樣,這對于我們今天的目的來說很好,但是如果您打算在生產(chǎn)環(huán)境中使用此代碼,您應(yīng)該:

  • 更新 start_tracker 函數(shù)以在 dlib 報(bào)告對象丟失后返回。
  • 同時(shí)刪除對應(yīng)進(jìn)程的 inputQueue 和 outputQueue。

未能執(zhí)行此清理將導(dǎo)致長時(shí)間運(yùn)行作業(yè)的不必要的計(jì)算消耗和內(nèi)存開銷。

第三個(gè)改進(jìn)是通過每 N 幀運(yùn)行一次對象檢測器(而不是在開始時(shí)只運(yùn)行一次)來提高跟蹤精度。

實(shí)際上,我在使用 OpenCV 計(jì)數(shù)的文章中演示了這一點(diǎn)。它需要更多的邏輯和思考,但會產(chǎn)生更準(zhǔn)確的跟蹤器。 我選擇放棄這個(gè)腳本的實(shí)現(xiàn),這樣我就可以簡明地教你多處理方法。 理想情況下,除了多處理之外,您還可以使用第三個(gè)改進(jìn)。

以上就是Python OpenCV使用dlib進(jìn)行多目標(biāo)跟蹤詳解的詳細(xì)內(nèi)容,更多關(guān)于OpenCV dlib多目標(biāo)跟蹤的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Python實(shí)現(xiàn)輕松讀取大文件的技巧揭秘

    Python實(shí)現(xiàn)輕松讀取大文件的技巧揭秘

    Python提供了多種方法來讀取文件內(nèi)容,其中包括read()、readline()和readlines()三個(gè)常用的函數(shù),本文將深入探討這三個(gè)函數(shù)的使用方法,需要的可以參考一下
    2023-08-08
  • python?import?logging問題

    python?import?logging問題

    這篇文章主要介紹了python?import?logging問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-06-06
  • TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程

    TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程

    今天動手開始搭建TensorFlow開發(fā)環(huán)境,所以下面這篇文章主要給大家介紹了關(guān)于TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2022-11-11
  • python實(shí)現(xiàn)Dijkstra算法的最短路徑問題

    python實(shí)現(xiàn)Dijkstra算法的最短路徑問題

    這篇文章主要介紹了python實(shí)現(xiàn)Dijkstra算法的最短路徑問題,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-06-06
  • numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete

    numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete

    這篇文章主要介紹了numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • Python?使用?pip?安裝?matplotlib?模塊的方法

    Python?使用?pip?安裝?matplotlib?模塊的方法

    matplotlib是python中強(qiáng)大的畫圖模塊,這篇文章主要介紹了Python?使用?pip?安裝?matplotlib?模塊(秒解版),本文給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • Python處理mysql特殊字符的問題

    Python處理mysql特殊字符的問題

    今天小編就為大家分享一篇Python處理mysql特殊字符的問題,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-03-03
  • 詳解用TensorFlow實(shí)現(xiàn)邏輯回歸算法

    詳解用TensorFlow實(shí)現(xiàn)邏輯回歸算法

    本篇文章主要介紹了詳解用TensorFlow實(shí)現(xiàn)邏輯回歸算法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-05-05
  • python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例

    python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例

    這篇文章主要介紹了python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例,需要的朋友可以參考下
    2014-04-04
  • 教你如何使用Conda命令?+?安裝tensorflow

    教你如何使用Conda命令?+?安裝tensorflow

    conda 是開源包(packages)和虛擬環(huán)境(environment)的管理系統(tǒng),這篇文章主要介紹了Conda命令和安裝tensorflow的詳細(xì)過程,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-01-01

最新評論