Java實現(xiàn)實時視頻轉播的代碼示例
實現(xiàn)功能簡述
最開始是想做一個在線的美妝功能,就像抖音上錄制視頻時增加特效一樣。由于是后端程序員,我下意識的認為對圖像/視頻的處理都應該在后臺完成,通過某種協(xié)議傳給前端進行展示,我的項目也是基于此寫成的。 主要用到的輪子:spring-boot,javaCV,Libjitsi,Webcam
第一步:后臺啟動攝像頭,抓拍圖像
while (isOpened) { synchronized (lock) { System.out.println("正在添加FFmpag"); BufferedImage image = webcam.getImage(); Frame frame = converter.getFrame(image); recorder.record(frame); if (!isFileStreamOn) { isFileStreamOn = true; } } Thread.yield(); }
此處用到Webcam包和JavaCV包
Webcam可以通過電腦攝像頭獲取BufferedImage圖片(當然javaCV中也有類似的工具) JavaCV包是用Java封裝音視頻處理的庫,主要邏輯是將圖片/視頻/音頻數(shù)據(jù)轉化為Frame類,基于Frame對象進行操作。 上面代碼中我將每張圖片用轉化器轉化成Frame對象(很明顯轉化器類源碼中的邏輯很值得研究),用FFmpegFrameRecorder將傳入的圖片轉化成視頻(文件/流)。
其中webcam,recorder和轉化器在config中定義為bean:
@Bean public Webcam getCam() { Webcam webcam = Webcam.getDefault(); webcam.setViewSize(new Dimension(640, 480)); return webcam; } @Bean public FFmpegFrameRecorder getRecorder(Webcam webcam){ FFmpegFrameRecorder recorder = new FFmpegFrameRecorder("src/main/resources/static/output.avi",webcam.getViewSize().width,webcam.getViewSize().height); recorder.setVideoCodecName("lib264"); recorder.setVideoCodec(avcodec.AV_CODEC_ID_MPEG4); recorder.setFormat("avi"); recorder.setFrameRate(24); return recorder; } @Bean public Java2DFrameConverter getConverter(){ return new Java2DFrameConverter(); }
第二步:后臺獲取的視頻發(fā)送RTP包到前端
while (isFileStreamOn) { InputStream stream = new FileInputStream(file); synchronized (lock) { while (stream.available() > 0) { byte[] bytes = new byte[1024]; int length = stream.read(bytes); RawPacket rawPacket = new RawPacket(bytes, 0, length); DatagramPacket udpPacket = new DatagramPacket(rawPacket.getBuffer(), rawPacket.getLength(), targetHost, targetPort); udpSocket.send(udpPacket); System.out.println("發(fā)送數(shù)據(jù)包 " + Arrays.toString(udpPacket.getData())) } // file = new File(file.getAbsoluteFile()); try (FileWriter writer = new FileWriter(file)) { writer.write(""); // 寫入空字符串,清空文件內(nèi)容 } catch (IOException e) { e.printStackTrace(); } stream = new FileInputStream(file); } }
注意點:
我將由FFmpegFrameRecorder寫入文件的視頻再次讀取出來,并將此文件流分段,用基于webRTC的jitsi庫進行打包發(fā)送。 jitsi是Java支持WebRTC(實時通信)的一個庫,主要能夠將數(shù)據(jù)以很小的時間間隔發(fā)送(據(jù)說視頻可以做到每幀一個包,我沒試也沒必要) webRTC中,RTP協(xié)議是基于UDP的,也就是說一個RTP會被拆分成多個UDP包。
問題1:為什么要先寫入文件再讀取文件? 其實FFmpegFrameRecorder用于圖片轉視頻時,視頻數(shù)據(jù)的寫入方式其實可以是文件,也可以是輸出流,假如可以獲取一段時間的視頻輸出流,無論是切片轉化還是加鎖重置都很方便,但唯獨是我用這個API有bug,底層的源碼繞來繞去到達JNI,就完全不懂了。
關于用文件緩存,我有另一個解決方法是:寫入文件到一定大小,主動切換寫另一個文件,將寫好的文件存入隊列中等待讀取,但由于JavaCV的API我還不太熟練,如何給一個recorder對象切換輸出流暫不明,此法仍需考慮。
問題2:寫入文件和讀取文件怎么做到異步? 出于實時性和用戶使用效果的考慮,程序的設計一定是一個線程寫,一個線程異步讀取,以下是我的程序設計以及其中出現(xiàn)的問題和解決方式
用于獲取視頻幀的線程B先執(zhí)行,啟動攝像頭并將視頻幀加入recorder寫入緩存(文件),寫入一段時間后用于發(fā)送數(shù)據(jù)包的線程A開始執(zhí)行,獲取到緩存中的數(shù)據(jù),分包,以UDP的形式發(fā)送到目標主機
為了避免兩個線程對同一緩存讀寫沖突,他們的操作分別需要加鎖。
怎么實現(xiàn)一個線程先于另一線程執(zhí)行呢?sleep基于時間設置延遲但并不是根據(jù)邏輯;wait和notify是基于邏輯的但也有要求——由于兩個方法是基于鎖的,所以只能在有鎖的同步代碼塊中執(zhí)行,用起來沒那么絲滑。
關于將輸出流分包轉儲到package中,我考慮了兩種方式:
持久讀取一個流中的數(shù)據(jù):用鎖將此流鎖住,然后就可以用固定長度的byte[]慢慢拆解。
立馬讀取此流的全部數(shù)據(jù):不需要鎖住流,只要將流中數(shù)據(jù)轉入超大byte[]中,再慢慢拆分大byte[]
第一種在時間上性能消耗較大,對流的阻塞時間較長,第二種在內(nèi)存上消耗較大,可能導致GC的STW較頻繁。
為了防止中間緩存過大,需要一定時間將其進行重置,也就是對文件進行清空 有權利清空文件的一定是能確保文件讀取完畢的線程,所以什么時候清空?我的想法是當讀線程發(fā)現(xiàn)沒有新數(shù)據(jù)可讀時就進行清空,那有沒有可能寫線程一直在寫導致不會有空隙沒有新數(shù)據(jù)?不會,因為我的程序加鎖的粒度比較大,當讀線程全部讀完后才會釋放鎖給寫線程。也就是說讀線程每次完整讀完緩存就會清空。
完整代碼:
import com.example.meitu2.utils.bfiOps; import com.github.sarxos.webcam.Webcam; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.javacv.FFmpegFrameRecorder; import org.bytedeco.javacv.Frame; import org.bytedeco.javacv.Java2DFrameConverter; import org.jitsi.service.neomedia.RawPacket; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.awt.image.BufferedImage; import java.io.*; import java.net.*; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @RestController public class video3Controller { @Autowired Webcam webcam; @Autowired List<Webcam> webcams; @Autowired com.example.meitu2.pojos.websocket websocket; @Autowired Java2DFrameConverter converter; boolean isOpened = false; boolean isFileStreamOn = false; boolean needsLight = false; int lightIndex = 0; boolean needDuibidu = false; int duibiduIndex = 0; @GetMapping("RTPThread") public void RTPThread() throws SocketException, UnknownHostException, FFmpegFrameRecorder.Exception, InterruptedException { if (!webcam.isOpen()) { webcam.open(); } isOpened = true; DatagramSocket udpSocket = new DatagramSocket(); InetAddress targetHost = InetAddress.getByName("localhost"); int targetPort = 2244; // ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); File file = new File("Demo.mp4"); FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(file.getAbsoluteFile(), webcam.getViewSize().width, webcam.getViewSize().height); int a = 0; recorder.setVideoCodecName("lib264"); recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); recorder.setFormat("mp4"); recorder.setFrameRate(24); System.out.println("recorder: " + recorder); recorder.start(); Object lock = new Object(); new Thread(() -> { if(outputStream.size()==1024){ try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } try { while (isFileStreamOn) { InputStream stream = new FileInputStream(file); synchronized (lock) { while (stream.available() > 0) { byte[] bytes = new byte[1024]; int length = stream.read(bytes); RawPacket rawPacket = new RawPacket(bytes, 0, length); DatagramPacket udpPacket = new DatagramPacket(rawPacket.getBuffer(), rawPacket.getLength(), targetHost, targetPort); udpSocket.send(udpPacket); System.out.println("發(fā)送數(shù)據(jù)包 " + Arrays.toString(udpPacket.getData())); } System.out.println("一份file讀完,更新file"); // file = new File(file.getAbsoluteFile()); try (FileWriter writer = new FileWriter(file)) { writer.write(""); // 寫入空字符串,清空文件內(nèi)容 } catch (IOException e) { e.printStackTrace(); } stream = new FileInputStream(file); } } System.out.println("一號外層while結束"); } catch (Exception e) { throw new RuntimeException(e); } }).start(); while (isOpened) { synchronized (lock) { System.out.println("正在添加FFmpag"); BufferedImage image = webcam.getImage(); Frame frame = converter.getFrame(image); recorder.record(frame); if (!isFileStreamOn) { isFileStreamOn = true; } } Thread.yield(); } recorder.stop(); recorder.release(); }
第三步:如何接收RTP包
由于前端代碼不太熟練,我決定用Java寫一個后臺接收這些文件進行驗證
public static void main(String[] args) throws IOException { DatagramSocket ds = new DatagramSocket(2244); byte[] bytes = new byte[1024]; int length = bytes.length; Object lock = new Object(); //鎖似乎沒有必要 Queue<DatagramPacket> dps = new ArrayDeque<>(); new Thread() { @Override public void run() { try { FileOutputStream outputStream = new FileOutputStream("Demo.mp4"); while (!dps.isEmpty()) { byte[] data = dps.poll().getData(); outputStream.write(data); System.out.println(data); if (data.length == 0) { break; } } } catch (Exception e) { throw new RuntimeException(e); } } }.start(); while (true) { DatagramPacket dp = new DatagramPacket(bytes, length); System.out.println("接收到dp"+ Arrays.toString(dp.getData())); dps.add(dp); ds.receive(dp); } }
這里也是兩個線程異步操作,一個用來接收,一個用來解讀
值得一提的是:由于此時數(shù)據(jù)是分批來的,天然可以用隊列有序操作。 而像之前數(shù)據(jù)是基于流進行傳輸,對一整個長時間的流數(shù)據(jù)的操作首先就需要合理的分片,這就要求涉及的線程通過加鎖避免沖突。
關于之前幾個版本的記錄:
1. 后端之間存儲mp4文件到靜態(tài)資源,前端訪問到即播放
后臺代碼:
@GetMapping("/setVideo") public synchronized Result setVideo() throws FFmpegFrameRecorder.Exception { //默認獲取三十秒的視頻 //加鎖是為了防止webcam被多個線程調用 if(!webcam.isOpen()){ webcam.open(); } num++; List<BufferedImage> list = new ArrayList<>(); long start = System.currentTimeMillis(); while (System.currentTimeMillis()-start<=10000){ BufferedImage bfi = webcam.getImage(); list.add(bfi); } System.out.println("鹿丸!,開存!"); webcam.close(); FFmpegFrameRecorder recorder = new FFmpegFrameRecorder("src/main/resources/static/output.mp4",webcam.getViewSize().width,webcam.getViewSize().height); recorder.setVideoCodecName("lib264"); recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); recorder.setFormat("mp4"); recorder.setFrameRate(24); System.out.println("recorder: "+recorder); recorder.start(); for (int i = 0; i < list.size(); i++) { Frame frame = converter.getFrame(list.get(i)); recorder.record(frame); } recorder.stop(); recorder.release(); System.out.println("存完"); return Result.ok("down"); }
前端代碼: 由于前端寫的很亂,很多東西都擠在一個vue頁面里,不太方面拆分 以下是核心實現(xiàn)代碼:
const getVideo = function () { axios.get("http://localhost:8080/output.mp4?t="+(new Date()).getTime(), { responseType: 'blob'}).then((result) => { console.log("getVideo的result: ", result); const blob = new Blob([result.data], { type: 'video/mp4' }); const videoURL = URL.createObjectURL(blob); console.log("videoValue", videoURL); video.value = videoURL; }) } const setVideo = function () { console.log('發(fā)起setVideo,后臺開始錄制'); axios.get("http://localhost:8080/setVideo").then((result) => { //由于是axios異步請求,axios還沒把videoURL填充 console.log("setVideo的result", result); }) setTimeout(function () { console.log("發(fā)起getVideo,獲取后臺錄制好的avi"); getVideo(); setVideo(); }, 12000);
主要的思路是:
后端循環(huán)錄制一定時間的視頻(比如說是10s),一旦錄制好就交給前端進行播放,類似在后端用mp4文件作為緩存一樣
- 前端其實沒辦法知道后端什么時候視頻更新了,于是要么輪詢,要么長鏈接,要么前后端約定一個時間,我是基于最后一種辦法,但很明顯網(wǎng)絡的傳輸和程序的不確定耗時必然導致時間的不準確——后端限制10s的視頻常常錄出12s...
- 還有一個我遇到的最大的問題是:瀏覽器的緩存問題: 明明后端的靜態(tài)文件更新了,明明我發(fā)的AJAX里有時間戳,明明我在開發(fā)者工具中設置了禁止緩存...但無論是AJAX還是之間localhost:8080/output.mp4都是老視頻 由于是直接訪問后臺靜態(tài)資源,沒法用MVC模板給response設置緩存,但可以加過濾器強行攔截;或者也有在前端增加增加配置來禁止緩存的,但也沒用...
還有一些額外的問題,比如,瀏覽器有時將訪問到的視頻是播放還是下載,取決于響應頭Content-Disposition,可以setHeader強行更改為attachment表示為播放;通過工具將視頻編碼格式轉化成支持瀏覽器的格式......
2. 用webSocket優(yōu)化
webSocket的作用是在前后端之間建立平等互通的通道,主要是后臺也可以給前端主動發(fā)起數(shù)據(jù),用這個可以優(yōu)化之前由于前后端規(guī)定時間訪問而出現(xiàn)的錯位問題
關于webSocket,它的設計還給前后端連接的建立提供了鉤子,這個思想在Java中感覺不常見;在vue這種很強調生命周期的語法中很常見 如下:可以在建立連接,關閉連接,接收信息,發(fā)送消息時都寫邏輯,就和前端能夠更好地,平等的完成一致的功能了。
后端代碼:
@GetMapping("openCam") public void openCam() throws IOException { if (!webcam.isOpen()) { webcam.open(); } isOpened = true; DatagramSocket udpSocket = new DatagramSocket(); InetAddress targetHost = InetAddress.getByName("localhost"); int targetPort = 80; while (isOpened) { List<BufferedImage> list = new ArrayList<>(); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 10000) { BufferedImage bfi = webcam.getImage(); if (needsLight) { bfi = bfiOps.light(bfi, lightIndex); } if (needDuibidu) { bfi = bfiOps.duibidu(bfi, duibiduIndex); } list.add(bfi); } System.out.println("鹿丸!,開存!小節(jié)視頻長度:" + (System.currentTimeMillis() - start)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); byte[] byteArray = outputStream.toByteArray(); FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(outputStream, webcam.getViewSize().width, webcam.getViewSize().height); recorder.setVideoCodecName("lib264"); recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); recorder.setFormat("mp4"); recorder.setFrameRate(24); System.out.println("recorder: " + recorder); recorder.start(); for (int i = 0; i < list.size(); i++) { Frame frame = converter.getFrame(list.get(i)); recorder.record(frame); } recorder.stop(); recorder.release(); System.out.println("后臺保存完畢"); RawPacket rtpPacket = new RawPacket(byteArray, 0, byteArray.length);//構造中需要傳入起始終結,可能表明不是讓一遍傳完 DatagramPacket udpPacket = new DatagramPacket(rtpPacket.getBuffer(), rtpPacket.getLength(), targetHost, targetPort); udpSocket.send(udpPacket); udpSocket.close(); websocket.sendOneMessage("0", "down"); } webcam.close(); }
前端代碼:
const openCam = function() { socket = new WebSocket("ws://localhost:8080/websocket/"+userId); socket.onopen = function(){ console.log("開啟"); axios.get("http://localhost:8080/openCam").then((result)=>{ console.log(result); }) } socket.onmessage = function(msg){ console.log("收到消息",msg.data); if(msg.data === "down"){ getVideo(); } } }
以上就是Java實現(xiàn)實時視頻轉播的代碼示例的詳細內(nèi)容,更多關于Java實時視頻轉播的資料請關注腳本之家其它相關文章!
相關文章
SpringBoot中通過8項配置優(yōu)化提升Tomcat性能的配置方法
優(yōu)化Spring Boot,Spring Cloud 應用程序中Tomcat的配置有助于提高性能和資源利用率,這篇文章主要介紹了SpringBoot中通過8項配置優(yōu)化提升Tomcat性能的配置方法,需要的朋友可以參考下2024-08-08MyBatis之自查詢使用遞歸實現(xiàn) N級聯(lián)動效果(兩種實現(xiàn)方式)
這篇文章主要介紹了MyBatis之自查詢使用遞歸實現(xiàn) N級聯(lián)動效果,本文給大家分享兩種實現(xiàn)方式,需要的的朋友參考下吧2017-07-07SpringBoot通過token實現(xiàn)用戶互踢功能(具體實現(xiàn))
所謂token,既用戶能夠在一定時間內(nèi)證明自己身份的一長串字符串,這篇文章主要介紹了SpringBoot通過token實現(xiàn)用戶互踢功能,需要的朋友可以參考下2024-04-04關于@JsonProperty和@JSONField注解的區(qū)別及用法
這篇文章主要介紹了關于@JsonProperty和@JSONField注解的區(qū)別及用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-08-08