如何將bag文件批量轉(zhuǎn)成mp4
更新時間:2022年10月11日 14:35:57 作者:點PY
這篇文章主要介紹了將bag文件批量轉(zhuǎn)成mp4,這篇博客涉及的腳本用來將bag文件批量轉(zhuǎn)化為mp4文件,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
簡介
這篇博客涉及的腳本用來將bag文件批量轉(zhuǎn)化為mp4文件
dockerfile
FROM osrf/ros:kinetic-desktop-full RUN sudo apt update && \ sudo apt install -y ffmpeg
Build docker image
docker build -t ros/kinetic:latest .
Build docker container
這里鏈接的數(shù)據(jù)卷,要換成你自己的本地路徑。容器路徑也可以自定義。
docker run -it -v /C/yp/project/realsense2video:/test ros/kinetic:latest
Run script
cd ${your project root} python main.py --data_root /test/data --out_root /test/save --fps 25
param data_root:包含bag的文件夾 param out_root:輸出目錄 param fps:每秒的幀數(shù)
Source code
bag2video.py
# -*- coding: utf-8 -*- #!/usr/bin/env python2 import roslib #roslib.load_manifest('rosbag') import rospy import rosbag import sys, getopt import os from sensor_msgs.msg import CompressedImage #壓縮圖片 from sensor_msgs.msg import Image import cv2 import numpy as np import shlex, subprocess #讀取命令行參數(shù) #subprocess 是一個 python 標準類庫,用于創(chuàng)建進程運行系統(tǒng)命令,并且可以連接進程的輸入輸出和 #錯誤管道,獲取它們的返回,使用起來要優(yōu)于 os.system,在這里我們使用這個庫運行 hive 語句并獲取返回結(jié)果。 #shlex 是一個 python 標準類庫,使用這個類我們可以輕松的做出對 linux shell 的詞法分析,在 #這里我們將格式化好的 hive 連接語句用 shlex 切分,配合 subprocess.run 使用。 MJPEG_VIDEO = 1 RAWIMAGE_VIDEO = 2 VIDEO_CONVERTER_TO_USE = "ffmpeg" # or you may want to use "avconv" #視頻轉(zhuǎn)換器 def print_help(): print('rosbag2video.py [--fps 25] [--rate 1] [-o outputfile] [-v] [-s] [-t topic] bagfile1 [bagfile2] ...') print() print('Converts image sequence(s) in ros bag file(s) to video file(s) with fixed frame rate using',VIDEO_CONVERTER_TO_USE) print(VIDEO_CONVERTER_TO_USE,'needs to be installed!') print() print('--fps Sets FPS value that is passed to',VIDEO_CONVERTER_TO_USE) print(' Default is 25.') print('-h Displays this help.') print('--ofile (-o) sets output file name.') print(' If no output file name (-o) is given the filename \'<prefix><topic>.mp4\' is used and default output codec is h264.') print(' Multiple image topics are supported only when -o option is _not_ used.') print(' ',VIDEO_CONVERTER_TO_USE,' will guess the format according to given extension.') print(' Compressed and raw image messages are supported with mono8 and bgr8/rgb8/bggr8/rggb8 formats.') print('--rate (-r) You may slow down or speed up the video.') print(' Default is 1.0, that keeps the original speed.') print('-s Shows each and every image extracted from the rosbag file (cv_bride is needed).') print('--topic (-t) Only the images from topic "topic" are used for the video output.') print('-v Verbose messages are displayed.') print('--prefix (-p) set a output file name prefix othervise \'bagfile1\' is used (if -o is not set).') print('--start Optional start time in seconds.') print('--end Optional end time in seconds.') class RosVideoWriter(): def __init__(self, fps=25.0, rate=1.0, topic="", output_filename ="", display= False, verbose = False, start = rospy.Time(0), end = rospy.Time(sys.maxsize)): self.opt_topic = topic self.opt_out_file = output_filename self.opt_verbose = verbose self.opt_display_images = display self.opt_start = start self.opt_end = end self.rate = rate self.fps = fps self.opt_prefix= None self.t_first={} self.t_file={} self.t_video={} self.p_avconv = {} #語法分析Args def parseArgs(self, args): opts, opt_files = getopt.getopt(args,"hsvr:o:t:p:",["fps=","rate=","ofile=","topic=","start=","end=","prefix="]) #getopt() for opt, arg in opts: if opt == '-h': print_help() sys.exit(0) elif opt == '-s': self.opt_display_images = True elif opt == '-v': self.opt_verbose = True elif opt in ("--fps"): self.fps = float(arg) elif opt in ("-r", "--rate"): self.rate = float(arg) elif opt in ("-o", "--ofile"): self.opt_out_file = arg elif opt in ("-t", "--topic"): self.opt_topic = arg elif opt in ("-p", "--prefix"): self.opt_prefix = arg elif opt in ("--start"): self.opt_start = rospy.Time(int(arg)) if(self.opt_verbose): print("starting at",self.opt_start.to_sec()) elif opt in ("--end"): self.opt_end = rospy.Time(int(arg)) if(self.opt_verbose): print("ending at",self.opt_end.to_sec()) else: print("opt:", opt,'arg:', arg) if (self.fps<=0): print("invalid fps", self.fps) self.fps = 1 if (self.rate<=0): print("invalid rate", self.rate) self.rate = 1 if(self.opt_verbose): print("using ",self.fps," FPS") return opt_files # filter messages using type or only the opic we whant from the 'topic' argument def filter_image_msgs(self, topic, datatype, md5sum, msg_def, header): if(datatype=="sensor_msgs/CompressedImage"): if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "": print("############# COMPRESSED IMAGE ######################") print(topic,' with datatype:', str(datatype)) print() return True; if(datatype=="theora_image_transport/Packet"): if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "": print(topic,' with datatype:', str(datatype)) print('!!! theora is not supported, sorry !!!') return False; if(datatype=="sensor_msgs/Image"): if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "": print("############# UNCOMPRESSED IMAGE ######################") print(topic,' with datatype:', str(datatype)) print() return True; return False; def write_output_video(self, msg, topic, t, video_fmt, pix_fmt = ""): # no data in this topic if len(msg.data) == 0 : return # initiate data for this topic if not topic in self.t_first : self.t_first[topic] = t # timestamp of first image for this topic self.t_video[topic] = 0 self.t_file[topic] = 0 # if multiple streams of images will start at different times the resulting video files will not be in sync # current offset time we are in the bag file self.t_file[topic] = (t-self.t_first[topic]).to_sec() # fill video file up with images until we reache the current offset from the beginning of the bag file while self.t_video[topic] < self.t_file[topic]/self.rate : if not topic in self.p_avconv: # we have to start a new process for this topic if self.opt_verbose : print("Initializing pipe for topic", topic, "at time", t.to_sec()) if self.opt_out_file=="": out_file = self.opt_prefix + str(topic).replace("/", "_")+".mp4" else: out_file = self.opt_out_file if self.opt_verbose : print("Using output file ", out_file, " for topic ", topic, ".") if video_fmt == MJPEG_VIDEO : cmd = [VIDEO_CONVERTER_TO_USE, '-v', '1', '-stats', '-r',str(self.fps),'-c','mjpeg','-f','mjpeg','-i','-','-an',out_file] self.p_avconv[topic] = subprocess.Popen(cmd, stdin=subprocess.PIPE) if self.opt_verbose : print("Using command line:") print(cmd) elif video_fmt == RAWIMAGE_VIDEO : size = str(msg.width)+"x"+str(msg.height) cmd = [VIDEO_CONVERTER_TO_USE, '-v', '1', '-stats','-r',str(self.fps),'-f','rawvideo','-s',size,'-pix_fmt', pix_fmt,'-i','-','-an',out_file] self.p_avconv[topic] = subprocess.Popen(cmd, stdin=subprocess.PIPE) if self.opt_verbose : print("Using command line:") print(cmd) else : print("Script error, unknown value for argument video_fmt in function write_output_video.") exit(1) # send data to ffmpeg process pipe self.p_avconv[topic].stdin.write(msg.data) # next frame time self.t_video[topic] += 1.0/self.fps def addBag(self, filename): if self.opt_display_images: from cv_bridge import CvBridge, CvBridgeError bridge = CvBridge() cv_image = [] if self.opt_verbose : print("Bagfile: {}".format(filename)) if not self.opt_prefix: # create the output in the same folder and name as the bag file minu '.bag' self.opt_prefix = bagfile[:-4] #Go through the bag file bag = rosbag.Bag(filename) if self.opt_verbose : print("Bag opened.") # loop over all topics for topic, msg, t in bag.read_messages(connection_filter=self.filter_image_msgs, start_time=self.opt_start, end_time=self.opt_end): try: if msg.format.find("jpeg")!=-1 : if msg.format.find("8")!=-1 and (msg.format.find("rgb")!=-1 or msg.format.find("bgr")!=-1 or msg.format.find("bgra")!=-1 ): if self.opt_display_images: np_arr = np.fromstring(msg.data, np.uint8) cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR) self.write_output_video( msg, topic, t, MJPEG_VIDEO ) elif msg.format.find("mono8")!=-1 : if self.opt_display_images: np_arr = np.fromstring(msg.data, np.uint8) cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR) self.write_output_video( msg, topic, t, MJPEG_VIDEO ) elif msg.format.find("16UC1")!=-1 : if self.opt_display_images: np_arr = np.fromstring(msg.data, np.uint16) cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR) self.write_output_video( msg, topic, t, MJPEG_VIDEO ) else: print('unsupported jpeg format:', msg.format, '.', topic) # has no attribute 'format' except AttributeError: try: pix_fmt=None if msg.encoding.find("mono8")!=-1 or msg.encoding.find("8UC1")!=-1: pix_fmt = "gray" if self.opt_display_images: cv_image = bridge.imgmsg_to_cv2(msg, "bgr8") elif msg.encoding.find("bgra")!=-1 : pix_fmt = "bgra" if self.opt_display_images: cv_image = bridge.imgmsg_to_cv2(msg, "bgr8") elif msg.encoding.find("bgr8")!=-1 : pix_fmt = "bgr24" if self.opt_display_images: cv_image = bridge.imgmsg_to_cv2(msg, "bgr8") elif msg.encoding.find("bggr8")!=-1 : pix_fmt = "bayer_bggr8" if self.opt_display_images: cv_image = bridge.imgmsg_to_cv2(msg, "bayer_bggr8") elif msg.encoding.find("rggb8")!=-1 : pix_fmt = "bayer_rggb8" if self.opt_display_images: cv_image = bridge.imgmsg_to_cv2(msg, "bayer_rggb8") elif msg.encoding.find("rgb8")!=-1 : pix_fmt = "rgb24" if self.opt_display_images: cv_image = bridge.imgmsg_to_cv2(msg, "bgr8") elif msg.encoding.find("16UC1")!=-1 : pix_fmt = "gray16le" else: print('unsupported encoding:', msg.encoding, topic) #exit(1) if pix_fmt: self.write_output_video( msg, topic, t, RAWIMAGE_VIDEO, pix_fmt ) except AttributeError: # maybe theora packet # theora not supported if self.opt_verbose : print("Could not handle this format. Maybe thoera packet? theora is not supported.") pass if self.opt_display_images: cv2.imshow(topic, cv_image) key=cv2.waitKey(1) if key==1048603: exit(1) if self.p_avconv == {}: print("No image topics found in bag:", filename) bag.close() if __name__ == '__main__': #print() #print('rosbag2video, by Maximilian Laiacker 2020 and Abel Gabor 2019') #print() if len(sys.argv) < 2: print('Please specify ros bag file(s)!') print_help() sys.exit(1) else : videowriter = RosVideoWriter() try: opt_files = videowriter.parseArgs(sys.argv[1:]) except getopt.GetoptError: print_help() sys.exit(2) # loop over all files for files in range(0,len(opt_files)): #First arg is the bag to look at bagfile = opt_files[files] videowriter.addBag(bagfile) print("finished")
main.py
import glob import sys import glob import os import argparse def mkdir(path): if not os.path.exists(path): os.mkdir(path) if __name__ == "__main__": parser = argparse.ArgumentParser( description='''This is a code for bag2video.''') parser.add_argument('--data_root', type=str, default=r'/test/data', help='path to the root of data') parser.add_argument('--out_root', type=str, default=r'/test/save', help='path to the root of data') parser.add_argument('--fps', type=int, default=25, help='path to the root of data') args = parser.parse_args() mkdir(args.out_root) bagList = glob.glob(os.path.join(args.data_root, "*.bag")) for bagPath in bagList: baseName = os.path.basename(bagPath).split(".")[0] outPath = os.path.join(args.out_root, baseName + ".mp4") os.system("python bag2video.py -o {outName} --fps {fps} {bagPath}".format(outName=outPath, fps=args.fps, bagPath=bagPath))
到此這篇關(guān)于如何將bag文件批量轉(zhuǎn)成mp4的文章就介紹到這了,更多相關(guān)bag文件批量轉(zhuǎn)成mp4內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
用sqlalchemy構(gòu)建Django連接池的實例
今天小編就為大家分享一篇用sqlalchemy構(gòu)建Django連接池的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08python-opencv中的cv2.inRange函數(shù)用法說明
這篇文章主要介紹了python-opencv中的cv2.inRange函數(shù)用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04對DataFrame數(shù)據(jù)中的重復(fù)行,利用groupby累加合并的方法詳解
今天小編就為大家分享一篇對DataFrame數(shù)據(jù)中的重復(fù)行,利用groupby累加合并的方法詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-01-01Python偏函數(shù)實現(xiàn)原理及應(yīng)用
這篇文章主要介紹了Python偏函數(shù)實現(xiàn)原理及應(yīng)用,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11