minio分布式文件管理方式
一、minio 是什么?
MinIO構建分布式文件系統(tǒng),MinIO 是一個非常輕量的服務,可以很簡單的和其他應用的結合使用,它兼容亞馬遜 S3 云存儲服務接口,非常適合于存儲大容量非結構化的數據,例如圖片、視頻、日志文件、備份數據和容器/虛擬機鏡像等。
二、minio的部署
本項目采用docker搭建
首先需要創(chuàng)建,文件存儲的目錄。以后上傳的文件,在這4個目錄中都會進行存儲(即:一個文件存儲4份),保證數據的安全性
mkdir -p /root/minio_data/data1 mkdir -p /root/minio_data/data2 mkdir -p /root/minio_data/data3 mkdir -p /root/minio_data/data4
docker run -p 9000:9000 -p 9001:9001 --name minio \ -v /root/minio_data/data1:/data1 \ -v /root/minio_data/data2:/data2 \ -v /root/minio_data/data3:/data3 \ -v /root/minio_data/data4:/data4 \ -e "MINIO_ROOT_USER=minioadmin" \ -e "MINIO_ROOT_PASSWORD=minioadmin" \ minio/minio server /data{1...4} --console-address ":9001"
- 9000端口是作為S3 API端口,用于API的調用,9001端口用于Web控制臺
minio/minio
: 這是Docker鏡像的名稱server /data{1...4}
: 這部分告訴MinIO以服務器模式啟動,并且使用/data1
,/data2
,/data3
, 和/data4
這四個目錄作為存儲位置。--console-address ":9001"
: 這個參數指定了MinIO Web控制臺的監(jiān)聽地址和端口。這里設置為":9001"
,意味著Web控制臺將監(jiān)聽容器內的9001端口。- 訪問地址:http://ip地址:9001 賬號:minioadmin 密碼:minioadmin
windows安裝方法
- 在D盤下創(chuàng)建minio_data目錄,在該目錄下創(chuàng)建分別創(chuàng)建data1,data2,data3,data4目錄
- 進入官網,下載minio.exe文件,進入cmd,執(zhí)行下列命令,啟動minio服務
minio.exe server D:\minio_data\data1 D:\minio_data\data2 D:\minio_data\data3 D:\minio_data\data4
訪問地址:
- fhttp://127.0.0.1:49229/ 賬號:minioadmin 密碼:minioadmin
三、基本使用方法
1.創(chuàng)建一個bucket
創(chuàng)建一個測試bucket,用以存儲文件
2.上傳文件
上傳文件后,我們可以發(fā)現在,data1 data2 data3 data4 目錄下都進行了存儲
測試minio的數據恢復過程:
1、首先刪除一個目錄。
- 刪除目錄后仍然可以在web控制臺上傳文件和下載文件。
- 稍等片刻刪除的目錄自動恢復。
2、刪除兩個目錄。
- 刪除兩個目錄也會自動恢復。
3、刪除三個目錄 。
- 由于 集合中共有4塊硬盤,有大于一半的硬盤損壞數據無法恢復。
此時報錯:We encountered an internal error, please try again. (Read failed. Insufficient number of drives online)在線驅動器數量不足。
四、項目依賴
這些項目中會用到的依賴
<dependency> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>8.4.3</version> </dependency> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>4.8.1</version> </dependency> <!--根據擴展名取mimetype--> <dependency> <groupId>com.j256.simplemagic</groupId> <artifactId>simplemagic</artifactId> <version>1.17</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.11</version> </dependency>
需要將訪問權限設置public,這樣遠程才能夠訪問到
需要三個參數才能連接到minio服務。
五、圖片上傳
1.本地測試
包含上傳文件、刪除文件、下載文件、檢查完整性
package com.xuecheng.media; import com.j256.simplemagic.ContentInfo; import com.j256.simplemagic.ContentInfoUtil; import io.minio.*; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.compress.utils.IOUtils; import org.junit.jupiter.api.Test; import org.springframework.http.MediaType; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FilterInputStream; /** * @description 測試MinIO * @author Mr.M * @date 2022/9/11 21:24 * @version 1.0 */ public class MinioTest { static MinioClient minioClient = MinioClient.builder() .endpoint("http://124.70.208.223:8089/") //9000端口用于API調用 .credentials("minioadmin", "minioadmin") .build(); private String getMimeType(String extension){ if(extension==null) extension = ""; //根據擴展名取出mimeType ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);//根據擴展名獲取MIME類型,比如.mp4文件的MIME類型是video/mp4 //通用mimeType,字節(jié)流 String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE; if(extensionMatch!=null){ mimeType = extensionMatch.getMimeType(); } return mimeType; } //上傳文件 @Test void upload() { try { String filename="E:\\Users\\31118\\Pictures\\Snipaste_2024-11-10_23-08-04.png"; String bucketName = "001/test001.jpg"; String bucket ="testbucket"; String mimeType = getMimeType(".jpg"); UploadObjectArgs testbucket = UploadObjectArgs.builder() .bucket(bucket) .filename(filename) //本地文件路徑 .object(bucketName) //上傳到bucket下的路徑 .contentType(mimeType)//默認根據擴展名確定文件 .build(); minioClient.uploadObject(testbucket); check(filename,bucketName,bucket); System.out.println("上傳成功"); } catch (Exception e) { e.printStackTrace(); System.out.println("上傳失敗"); } } //刪除文件 @Test void delete(){ try { RemoveObjectArgs testbucket = RemoveObjectArgs.builder().bucket("testbucket").object("001/test001.jpg").build(); minioClient.removeObject(testbucket); System.out.println("刪除成功"); } catch (Exception e) { e.printStackTrace(); System.out.println("刪除失敗"); } } //查看/下載文件 @Test void getFile() { GetObjectArgs getObjectArgs = GetObjectArgs.builder().bucket("testbucket").object("001/test001.jpg").build(); try( FilterInputStream inputStream = minioClient.getObject(getObjectArgs); FileOutputStream outputStream = new FileOutputStream(new File("E:\\圖片.gif"));//輸出路徑 ) { IOUtils.copy(inputStream,outputStream); } catch (Exception e) { e.printStackTrace(); } } //對上傳之后和下載完成后的文件進行完整性檢查,防止丟包 //將上傳完成后的文件和本地的臨時文件的md5的值進行比對,如果一致,則說明上傳和下載成功 void check(String fileName,String bucketName,String bucket){ GetObjectArgs getObjectArgs = GetObjectArgs.builder().bucket(bucket).object(bucketName).build(); //校驗文件的完整性對文件的內容進行md5 try { //獲取遠程文件的md5 FilterInputStream fileInputStream1 = minioClient.getObject(getObjectArgs); String source_md5 = DigestUtils.md5Hex(fileInputStream1); //獲取本地文件的md5 FileInputStream fileInputStream = new FileInputStream(new File(fileName)); String local_md5 = DigestUtils.md5Hex(fileInputStream); if(source_md5.equals(local_md5)){ System.out.println("下載成功"); } }catch (Exception e){ e.printStackTrace(); } } }
上傳文件時contentType("")屬性并不是強制要求設置的,但一般建議設置,以便瀏覽器進行識別該文件的類型
2.java服務器遠程部署-圖片上傳
minio: endpoint: http://124.70.208.223:9000 #API訪問路徑 accessKey: minioadmin #登錄賬號 secretKey: minioadmin #登錄密碼 bucket: files: mediafiles #文件/圖片 存在的位置 videofiles: video #視頻存儲的位置
文件上傳時,獲取md5,作為主鍵保存在文件表中
后續(xù)上傳的如果是同一個文件時,他們的md5的值是一致的,不在進行二次存儲
配置類注冊,方便后面直接使用
package com.xuecheng.media.config; import io.minio.MinioClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description minio配置 */ @Configuration public class MinioConfig { @Value("${minio.endpoint}") private String endpoint; @Value("${minio.accessKey}") private String accessKey; @Value("${minio.secretKey}") private String secretKey; @Bean public MinioClient minioClient() { return MinioClient.builder() .endpoint(endpoint) .credentials(accessKey, secretKey) .build(); } }
控制層接收到MultipartFile后,這是獲取一些常見屬性的辦法,方便對文件進行存儲
@ApiOperation("上傳文件") @RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE) //對文件類型進行聲明 public UploadFileResultDto upload(@RequestPart("filedata") MultipartFile filedata) throws IOException { //文件大小 long fileSize = filedata.getSize(); //文件名稱 String originalFilename = filedata.getOriginalFilename(); //創(chuàng)建臨時文件 File tempFile = File.createTempFile("minio", "temp"); //createTempFile 方法會生成一個唯一的文件名,該文件名由前綴、一個隨機生成的字符串和后綴組成。例如/minio1234567890temp //上傳的文件拷貝到臨時文件 filedata.transferTo(tempFile); //文件路徑 String absolutePath = tempFile.getAbsolutePath(); }
3.上傳文件
/** * @description 將文件寫入minIO * @param localFilePath 文件地址 * @param bucket 桶 * @param objectName 對象名稱 * @return void * @author Mr.M * @date 2022/10/12 21:22 */ public boolean addMediaFilesToMinIO(String localFilePath,String mimeType,String bucket, String objectName) { try { UploadObjectArgs.Builder builder = UploadObjectArgs.builder() .bucket(bucket) .object(objectName) .filename(localFilePath); if (mimeType != null) {//若上傳的不是分塊文件,指定文件類型 builder.contentType(mimeType); } UploadObjectArgs testbucket =builder.build(); minioClient.uploadObject(testbucket); log.debug("上傳文件到minio成功,bucket:{},objectName:{}",bucket,objectName); System.out.println("上傳成功"); return true; } catch (Exception e) { e.printStackTrace(); log.error("上傳文件到minio出錯,bucket:{},objectName:{},錯誤原因:{}",bucket,objectName,e.getMessage(),e); } return false; }
4.需要用到的工具方法
獲取文件Md5
//獲取文件的md5 private String getFileMd5(File file) { try (FileInputStream fileInputStream = new FileInputStream(file)) { String fileMd5 = DigestUtils.md5Hex(fileInputStream); return fileMd5; } catch (Exception e) { e.printStackTrace(); return null; } }
獲取年月日結構目錄
//獲取文件默認存儲目錄路徑 年/月/日 private String getDefaultFolderPath() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String format = sdf.format(new Date()); return format.replace("-", "/")+"/"; }
根據擴展名獲取MIME類型
比如.mp4文件的MIME類型是video/mp4
private String getMimeType(String extension){ //傳入.jpg if(extension==null) extension = ""; //根據擴展名取出mimeType ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension); //通用mimeType,字節(jié)流 String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE; if(extensionMatch!=null){ mimeType = extensionMatch.getMimeType(); } return mimeType; }
這邊只進行關鍵信息的展示,數據庫相關操作根據項目自行處理
圖片的訪問鏈接是:
服務器ip:9000/mediafiles/2024/11/27/e0abb735ab793fae5568c2ed537ab37c.jpg
注意9000是API地址,9001是web服務地址
六、視頻上傳-斷點續(xù)傳
minio限制,視頻至少以5mb,劃分
1.文件上傳前檢查文件是否已上傳
先通過前端計算出視頻md5的值,傳給后端,檢測該視頻是否已經存在,
@ApiOperation(value = "文件上傳前檢查文件") @PostMapping("/upload/checkfile") public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception { return bigFilesService.checkFile(fileMd5); }
@Override public RestResponse<Boolean> checkFile(String fileMd5) { //查詢文件信息 MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5); if (mediaFiles != null) { //桶 String bucket = mediaFiles.getBucket(); //存儲目錄 String filePath = mediaFiles.getFilePath(); //文件流 InputStream stream = null; try { stream = minioClient.getObject( GetObjectArgs.builder() .bucket(bucket) .object(filePath) .build()); if (stream != null) { //文件已存在 return RestResponse.success(true); } } catch (Exception e) { log.info("文件不存在,準備開始分塊上傳"); } } //文件不存在 return RestResponse.success(false); }
2.分塊上傳前檢測分塊是否已上傳
根據后端的響應信息,若該視頻不存在,前端對視頻劃分為一個個分塊,并計算每個分塊的md5值
將分塊的md5值,傳給后端,判斷該分塊是否存在,若該分塊不存在則上傳分塊
@ApiOperation(value = "分塊文件上傳前的檢測") @PostMapping("/upload/checkchunk") public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception { return bigFilesService.checkChunk(fileMd5,chunk); }
@Override public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) { //得到分塊文件目錄 String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); //得到分塊文件的路徑 String chunkFilePath = chunkFileFolderPath + chunkIndex; //文件流 InputStream fileInputStream = null; try { fileInputStream = minioClient.getObject( GetObjectArgs.builder() .bucket(bucket_videoFiles) .object(chunkFilePath) .build()); if (fileInputStream != null) { //分塊已存在 log.info("分塊{}已存在",chunkIndex); return RestResponse.success(true); } } catch (Exception e) { //minio中沒有該分塊,上傳分塊 log.info("分塊{}不存在,開始上傳",chunkIndex); } //分塊未存在 return RestResponse.success(false); }
3.上傳分塊
@ApiOperation(value = "上傳分塊文件") @PostMapping("/upload/uploadchunk") public RestResponse uploadchunk(@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception { //創(chuàng)建臨時文件 File tempFile = File.createTempFile("minio", "temp"); //上傳的文件拷貝到臨時文件 file.transferTo(tempFile); //文件路徑 String localFilePath = tempFile.getAbsolutePath(); return bigFilesService.uploadChunk(fileMd5,chunk,localFilePath); }
@Value("${minio.bucket.videofiles}") private String bucket_videoFiles; @Override public RestResponse uploadChunk(String fileMd5, int chunk,String localFilePath) { //得到分塊文件的目錄路徑 String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); //得到分塊文件的路徑 String chunkFilePath = chunkFileFolderPath + chunk; try { //將文件存儲至minIO addMediaFilesToMinIO(localFilePath,null, bucket_videoFiles,chunkFilePath); return RestResponse.success(true); } catch (Exception ex) { ex.printStackTrace(); log.debug("上傳分塊文件:{},失敗:{}",chunkFilePath,ex.getMessage()); } return RestResponse.validfail(false,"上傳分塊失敗"); } /** * @description 將文件寫入minIO * @param localFilePath 文件地址 * @param bucket 桶 * @param objectName 對象名稱 * @return void * @author Mr.M * @date 2022/10/12 21:22 */ public boolean addMediaFilesToMinIO(String localFilePath,String mimeType,String bucket, String objectName) { try { UploadObjectArgs testbucket = UploadObjectArgs.builder() .bucket(bucket) .object(objectName) .filename(localFilePath) .contentType(mimeType) .build(); minioClient.uploadObject(testbucket); log.debug("上傳文件到minio成功,bucket:{},objectName:{}",bucket,objectName); System.out.println("上傳成功"); return true; } catch (Exception e) { e.printStackTrace(); log.error("上傳文件到minio出錯,bucket:{},objectName:{},錯誤原因:{}",bucket,objectName,e.getMessage(),e); XueChengPlusException.cast("上傳文件到文件系統(tǒng)失敗"); } return false; }
md5目錄結構
分塊存儲目錄:d/a/da112e234adasdasd/chunk
//得到分塊文件的目錄 private String getChunkFileFolderPath(String fileMd5) { return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/"; }
修改文件大小的限制
前端對文件分塊的大小為5MB,SpringBoot web默認上傳文件的大小限制為1MB
spring: servlet: multipart: max-file-size: 50MB #單個文件的大小限制 max-request-size: 50MB #單次請求的大小限制
4.合并分塊
@ApiOperation(value = "合并文件") @PostMapping("/upload/mergechunks") public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal) throws Exception { Long companyId = 1232141425L; UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto(); uploadFileParamsDto.setFileType("001002"); uploadFileParamsDto.setTags("課程視頻"); uploadFileParamsDto.setRemark(""); uploadFileParamsDto.setFilename(fileName); return bigFilesService.mergechunks(companyId,fileMd5,chunkTotal,uploadFileParamsDto); }
@Override public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) { //=====獲取分塊文件路徑===== String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); //組成將分塊文件路徑組成 List<ComposeSource> List<ComposeSource> sourceObjectList = Stream.iterate(0, i -> ++i) //從0開始,迭代到chunkTotal,依次獲取所有分塊文件,0 1 2 .limit(chunkTotal) .map(i -> ComposeSource.builder() .bucket(bucket_videoFiles) .object(chunkFileFolderPath+i) .build()) .collect(Collectors.toList()); //=====合并===== //文件名稱 String fileName = uploadFileParamsDto.getFilename(); //文件擴展名 String extName = fileName.substring(fileName.lastIndexOf(".")); //合并文件路徑 String mergeFilePath = getFilePathByMd5(fileMd5, extName); try { //合并文件 ObjectWriteResponse response = minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucket_videoFiles) .object(mergeFilePath) .sources(sourceObjectList) .build()); log.debug("合并文件成功:{}",mergeFilePath); } catch (Exception e) { log.debug("合并文件失敗,fileMd5:{},異常:{}",fileMd5,e.getMessage(),e); return RestResponse.validfail(false, "合并文件失敗。"); } // ====驗證md5==== File minioFile = downloadFileFromMinIO(bucket_videoFiles,mergeFilePath); if(minioFile == null){ log.debug("下載合并后文件失敗,mergeFilePath:{}",mergeFilePath); return RestResponse.validfail(false, "下載合并后文件失敗。"); } try (InputStream newFileInputStream = new FileInputStream(minioFile)) { //minio上文件的md5值 String md5Hex = DigestUtils.md5Hex(newFileInputStream); //比較md5值,不一致則說明文件不完整 if(!fileMd5.equals(md5Hex)){ return RestResponse.validfail(false, "文件合并校驗失敗,最終上傳失敗。"); } //文件大小 uploadFileParamsDto.setFileSize(minioFile.length()); }catch (Exception e){ log.debug("校驗文件失敗,fileMd5:{},異常:{}",fileMd5,e.getMessage(),e); return RestResponse.validfail(false, "文件合并校驗失敗,最終上傳失敗。"); }finally { if(minioFile!=null){ //刪除下載的臨時文件 minioFile.delete(); } } //文件入庫 currentProxy.addMediaFilesToDb(companyId,fileMd5,uploadFileParamsDto,bucket_videoFiles,mergeFilePath); //=====清除分塊文件===== clearChunkFiles(chunkFileFolderPath,chunkTotal); return RestResponse.success(true); }
下載至本地,用于md5檢測
獲得合并后文件存儲路徑
private String getFilePathByMd5(String fileMd5,String fileExt){ return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt; }
將上傳后的文件下載至本地
將下載文件的md5的值與前端傳遞過來時視頻md5值進行比較,判斷視頻上傳時是否出現丟包
public File downloadFileFromMinIO(String bucket,String objectName){ //臨時文件 File minioFile = null; FileOutputStream outputStream = null; try{ InputStream stream = minioClient.getObject(GetObjectArgs.builder() .bucket(bucket) .object(objectName) .build()); //創(chuàng)建臨時文件 minioFile=File.createTempFile("minio", ".merge"); outputStream = new FileOutputStream(minioFile); IOUtils.copy(stream,outputStream); return minioFile; } catch (Exception e) { e.printStackTrace(); }finally { if(outputStream!=null){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return null; }
刪除分塊
視頻上傳成功后,刪除之前上傳的分塊
private void clearChunkFiles(String chunkFileFolderPath,int chunkTotal){ try { List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i) .limit(chunkTotal) .map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i)))) .collect(Collectors.toList()); RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build(); Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs); results.forEach(r->{ DeleteError deleteError = null; try { deleteError = r.get(); } catch (Exception e) { e.printStackTrace(); log.error("清楚分塊文件失敗,objectname:{}",deleteError.objectName(),e); } }); } catch (Exception e) { e.printStackTrace(); log.error("清楚分塊文件失敗,chunkFileFolderPath:{}",chunkFileFolderPath,e); } }
分塊文件清理問題
上傳一個文件進行分塊上傳,上傳一半不傳了,之前上傳到minio的分塊文件要清理嗎?怎么做的?
1、在數據庫中有一張文件表記錄minio中存儲的文件信息。
2、文件開始上傳時會寫入文件表,狀態(tài)為上傳中,上傳完成會更新狀態(tài)為上傳完成。
3、當一個文件傳了一半不再上傳了說明該文件沒有上傳完成,會有定時任務去查詢文件表中的記錄,如果文件未上傳完成則刪除minio中沒有上傳成功的文件目錄。
視頻文件格式轉換
視頻文件的格式有很多中,我們需要把視頻格式統(tǒng)一轉換為mp4,下面以avi文件格式轉換為mp4格式舉例
FFmpeg進行媒體文件的轉換
ffmpeg的安裝及基本使用:
xxl-job分布式任務調度
由于媒體文件轉換需要處理的時間,我們采用xxl-job進行分布式任務調度
xxl-job的基本使用方法:
多服務執(zhí)行:
-Dserver.port=63051 -Dxxl.job.executor.port=9998
什么是樂觀鎖、悲觀鎖?
synchronized是一種悲觀鎖,在執(zhí)行被synchronized包裹的代碼時需要首先獲取鎖,沒有拿到鎖則無法執(zhí)行,是總悲觀的認為別的線程會去搶,所以要悲觀鎖。
樂觀鎖的思想是它不認為會有線程去爭搶,盡管去執(zhí)行,如果沒有執(zhí)行成功就再去重試。
為了防止多個分布式任務,執(zhí)行同一個行為,需要使用分布鎖進行來控制
1、基于數據庫實現分布鎖
利用數據庫主鍵唯一性的特點,或利用數據庫唯一索引、行級鎖的特點,多個線程同時去更新相同的記錄,誰更新成功誰就搶到鎖。
數據庫表的設計
在上傳文件之后,將需要格式轉換的文件,存入media_process數據庫
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for media_process -- ---------------------------- DROP TABLE IF EXISTS `media_process`; CREATE TABLE `media_process` ( `id` bigint NOT NULL AUTO_INCREMENT, `file_id` varchar(120) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '文件標識', `filename` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '文件名稱', `bucket` varchar(128) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '存儲桶', `file_path` varchar(512) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '存儲路徑', `status` varchar(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '狀態(tài),1:未處理,2:處理成功 3處理失敗 4處理中', `create_date` datetime NOT NULL COMMENT '上傳時間', `finish_date` datetime NULL DEFAULT NULL COMMENT '完成時間', `fail_count` int NULL DEFAULT 0 COMMENT '失敗次數', `url` varchar(1024) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '媒資文件訪問地址', `errormsg` varchar(1024) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '失敗原因', PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `unique_fileid`(`file_id` ASC) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 15 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = DYNAMIC; SET FOREIGN_KEY_CHECKS = 1;
誰先搶到,誰處理
視頻處理完成后,轉存如 media_process_history表中,在media_process表中,刪除該條記錄
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for media_process_history -- ---------------------------- DROP TABLE IF EXISTS `media_process_history`; CREATE TABLE `media_process_history` ( `id` bigint NOT NULL AUTO_INCREMENT, `file_id` varchar(120) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '文件標識', `filename` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '文件名稱', `bucket` varchar(128) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '存儲源', `status` varchar(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '狀態(tài),1:未處理,2:處理成功 3處理失敗', `create_date` datetime NOT NULL COMMENT '上傳時間', `finish_date` datetime NOT NULL COMMENT '完成時間', `url` varchar(1024) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '媒資文件訪問地址', `fail_count` int NULL DEFAULT 0 COMMENT '失敗次數', `file_path` varchar(512) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '文件路徑', `errormsg` varchar(1024) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '失敗原因', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 12 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = DYNAMIC; SET FOREIGN_KEY_CHECKS = 1;
@XxlJob("videoJobHandler") public void videoJobHandler() throws Exception { // 分片參數 int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); List<MediaProcess> mediaProcessList = null; int size = 0; try { //取出cpu核心數作為一次處理數據的條數 int processors = Runtime.getRuntime().availableProcessors(); //獲取待處理視頻 //一次處理視頻數量不要超過cpu核心數,避免CPU超載 mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors); size = mediaProcessList.size(); log.debug("取出待處理視頻任務{}條", size); if (size < 0) { return; } } catch (Exception e) { e.printStackTrace(); return; } //啟動size個線程的線程池 ExecutorService threadPool = Executors.newFixedThreadPool(size); //計數器,用于等待所有線程執(zhí)行完畢 CountDownLatch countDownLatch = new CountDownLatch(size); //將處理任務加入線程池 mediaProcessList.forEach(mediaProcess -> { threadPool.execute(() -> { //所以線程,通過循環(huán)同時啟動 try { //任務id Long taskId = mediaProcess.getId(); //搶占任務,將任務status狀態(tài)改為4正在處理 boolean b = mediaFileProcessService.startTask(taskId); if (!b) { return; } log.debug("開始執(zhí)行任務:{}", mediaProcess); //下邊是處理邏輯 //桶 String bucket = mediaProcess.getBucket(); //存儲路徑 String filePath = mediaProcess.getFilePath(); //原始視頻的md5值 String fileId = mediaProcess.getFileId(); //原始文件名稱 String filename = mediaProcess.getFilename(); //將要處理的文件下載到服務器上 File originalFile = bigFilesService.downloadFileFromMinIO(mediaProcess.getBucket(), mediaProcess.getFilePath()); if (originalFile == null) { log.debug("下載待處理文件失敗,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath())); mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下載待處理文件失敗"); return; } //處理結束的視頻文件 File mp4File = null; //創(chuàng)建臨時文件,作為轉化后的文件 try { mp4File = File.createTempFile("mp4", ".mp4"); } catch (IOException e) { log.error("創(chuàng)建mp4臨時文件失敗"); //保存任務是失敗的結果 mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "創(chuàng)建mp4臨時文件失敗"); return; } //視頻處理結果 String result = ""; try { String absolutePath = mp4File.getAbsolutePath();//包含了,文件名 String localPath = absolutePath.substring(0, absolutePath.lastIndexOf("\\")+1); //開始處理視頻 Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, originalFile.getAbsolutePath(), mp4File.getName(),localPath); //開始視頻轉換,成功將返回success result = videoUtil.generateMp4(); } catch (Exception e) { e.printStackTrace(); log.error("處理視頻文件:{},出錯:{}", mediaProcess.getFilePath(), e.getMessage()); mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下載待處理文件失敗"); } if (!result.equals("success")) { //記錄錯誤信息 log.error("處理視頻失敗,視頻地址:{},錯誤信息:{}", bucket + filePath, result); mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result); return; } //將mp4上傳至minio //mp4在minio的存儲路徑 String objectName = getFilePath(fileId, ".mp4"); //訪問url String url = "/" + bucket + "/" + objectName; try { bigFilesService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName); //將url存儲至數據,并更新狀態(tài)為成功,并將待處理視頻記錄刪除存入歷史 mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null); } catch (Exception e) { log.error("上傳視頻失敗或入庫失敗,視頻地址:{},錯誤信息:{}", bucket + objectName, e.getMessage()); //最終還是失敗了 mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "處理后視頻上傳或入庫失敗"); } }finally { countDownLatch.countDown(); //線程數減一 } }); }); //等待,給一個充裕的超時時間,防止無限等待,到達超時時間還沒有處理完成則結束任務 countDownLatch.await(30, TimeUnit.MINUTES); } private String getFilePath(String fileMd5,String fileExt){ return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt; }
當前需要處理的視頻文件,需要根據計算機 當前計算機啟動的服務下標(從0開始...),當前計算機啟動服務總個數 和 計算機的線程數計算得出,因為若計算機的線程數為8,一次性最多處理8個視頻
sql語句這樣設計的目的是為了給每個服務(執(zhí)行器),分配任務。一臺8核的計算機,一次性最多分配8個任務
@Override public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) { return mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count); } /** * @description 根據分片參數獲取待處理任務,一次處理視頻數量不要超過cpu核心數,避免CPU超載 * @param shardTotal 分片總數 * @param shardIndex 分片序號 * @param count 任務數 * @return java.util.List<com.xuecheng.media.model.po.MediaProcess> * @author Mr.M * @date 2022/9/14 8:54 */ @Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}") List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count);
Sql語句的查詢,原理如下
上邊兩個執(zhí)行器實例那么分片總數為2,序號為0、1,從任務1開始,如下:
1 % 2 = 1 執(zhí)行器2執(zhí)行
2 % 2 = 0 執(zhí)行器1執(zhí)行
3 % 2 = 1 執(zhí)行器2執(zhí)行
以此類推.
一個服務(執(zhí)行器),所以線程同時執(zhí)行,為了防止多個線程執(zhí)行的是同一個任務,當前線程執(zhí)行時需要前開啟任務時,將數據庫的狀態(tài)設置為4,表示正在處理中,防止下次執(zhí)行時,被其他執(zhí)行器搶占
/** * 開啟一個任務 * @param id 任務id * @return 更新記錄數 */ @Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}") int startTask(@Param("id") long id);
若視頻轉換過程中出現異常,失敗次數+1,失敗次數達到3此不在執(zhí)行。
若視頻轉換成功,修改任務狀態(tài)為2,并將其存入歷史進程表中,在當前表中刪除該條記錄
@Transactional @Override public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) { //查出任務,如果不存在則直接返回 MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId); if(mediaProcess == null){ return ; } //處理失敗,更新任務處理結果 LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId); //處理失敗 if(status.equals("3")){ MediaProcess mediaProcess_u = new MediaProcess(); mediaProcess_u.setStatus("3"); mediaProcess_u.setErrormsg(errorMsg); mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1); mediaProcessMapper.update(mediaProcess_u,queryWrapperById); log.debug("更新任務處理狀態(tài)為失敗,任務信息:{}",mediaProcess_u); return ; } //任務處理成功 MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId); if(mediaFiles!=null){ //更新媒資文件中的訪問url mediaFiles.setUrl(url); mediaFilesMapper.updateById(mediaFiles); } //處理成功,更新url和狀態(tài) mediaProcess.setUrl(url); mediaProcess.setStatus("2"); mediaProcess.setFinishDate(LocalDateTime.now()); mediaProcessMapper.updateById(mediaProcess); //添加到歷史記錄 MediaProcessHistory mediaProcessHistory = new MediaProcessHistory(); BeanUtils.copyProperties(mediaProcess, mediaProcessHistory); mediaProcessHistoryMapper.insert(mediaProcessHistory); //刪除mediaProcess mediaProcessMapper.deleteById(mediaProcess.getId()); }
工具類
檢查視頻時長,校驗兩個視頻時長是否相等,等待進程處理完畢
package com.xuecheng.base.utils; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; /** * 此文件作為視頻文件處理父類,提供: * 1、查看視頻時長 * 2、校驗兩個視頻的時長是否相等 * */ public class VideoUtil { String ffmpeg_path;//ffmpeg的安裝位置 public VideoUtil(String ffmpeg_path){ this.ffmpeg_path = ffmpeg_path; } //檢查視頻時間是否一致 public Boolean check_video_time(String source,String target) { String source_time = get_video_time(source); //取出時分秒 source_time = source_time.substring(source_time.lastIndexOf(":")+1); String target_time = get_video_time(target); //取出時分秒 target_time = target_time.substring(target_time.lastIndexOf(":")+1); if(source_time == null || target_time == null){ return false; } float v1 = Float.parseFloat(source_time); float v2 = Float.parseFloat(target_time); float abs = Math.abs(v1 - v2); if(abs<1){//轉化是會有細微差距,屬于正?,F象 return true; } return false; } //獲取視頻時間(時:分:秒:毫秒) public String get_video_time(String video_path) { /* ffmpeg -i lucene.mp4 */ List<String> commend = new ArrayList<String>(); commend.add(ffmpeg_path); commend.add("-i"); commend.add(video_path); try { ProcessBuilder builder = new ProcessBuilder(); builder.command(commend); //將標準輸入流和錯誤輸入流合并,通過標準輸入流程讀取信息 builder.redirectErrorStream(true); Process p = builder.start(); String outstring = waitFor(p); System.out.println(outstring); int start = outstring.trim().indexOf("Duration: "); if(start>=0){ int end = outstring.trim().indexOf(", start:"); if(end>=0){ String time = outstring.substring(start+10,end); if(time!=null && !time.equals("")){ return time.trim(); } } } } catch (Exception ex) { ex.printStackTrace(); } return null; } //等待一個外部進程(通過Process對象表示)完成,并在此過程中捕獲該進程的標準輸出和錯誤輸出。 public String waitFor(Process p) { InputStream in = null; InputStream error = null; String result = "error"; int exitValue = -1; StringBuffer outputString = new StringBuffer(); try { in = p.getInputStream(); error = p.getErrorStream(); boolean finished = false; int maxRetry = 600;//每次休眠1秒,最長執(zhí)行時間10分種 int retry = 0; while (!finished) { if (retry > maxRetry) { return "error"; } try { while (in.available() > 0) { Character c = new Character((char) in.read()); outputString.append(c); System.out.print(c); } while (error.available() > 0) { Character c = new Character((char) in.read()); outputString.append(c); System.out.print(c); } //進程未結束時調用exitValue將拋出異常 exitValue = p.exitValue(); finished = true; } catch (IllegalThreadStateException e) { Thread.currentThread().sleep(1000);//休眠1秒 retry++; } } } catch (Exception e) { e.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e) { System.out.println(e.getMessage()); } } } return outputString.toString(); } public static void main(String[] args) throws IOException { String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安裝位置 VideoUtil videoUtil = new VideoUtil(ffmpeg_path); String video_time = videoUtil.get_video_time("E:\\ffmpeg_test\\1.avi"); System.out.println(video_time); } }
avi格式轉mp4格式
package com.xuecheng.base.utils; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class Mp4VideoUtil extends VideoUtil { String ffmpeg_path;//ffmpeg的安裝位置 String video_path; String mp4_name; String mp4folder_path; public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){ super(ffmpeg_path); this.ffmpeg_path = ffmpeg_path; this.video_path = video_path; this.mp4_name = mp4_name; this.mp4folder_path = mp4folder_path; } //清除已生成的mp4 private void clear_mp4(String mp4_path){ //刪除原來已經生成的m3u8及ts文件 File mp4File = new File(mp4_path); if(mp4File.exists() && mp4File.isFile()){ mp4File.delete(); } } /** * 視頻編碼,生成mp4文件 * @return 成功返回success,失敗返回控制臺日志 */ public String generateMp4(){ //清除已生成的mp4 // clear_mp4(mp4folder_path+mp4_name); clear_mp4(mp4folder_path); /* ffmpeg.exe -i lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4 */ List<String> commend = new ArrayList<String>(); //commend.add("D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe"); commend.add(ffmpeg_path); commend.add("-i"); // commend.add("D:\\BaiduNetdiskDownload\\test1.avi"); commend.add(video_path); commend.add("-c:v"); commend.add("libx264"); commend.add("-y");//覆蓋輸出文件 commend.add("-s"); commend.add("1280x720"); commend.add("-pix_fmt"); commend.add("yuv420p"); commend.add("-b:a"); commend.add("63k"); commend.add("-b:v"); commend.add("753k"); commend.add("-r"); commend.add("18"); commend.add(mp4folder_path + mp4_name ); String outstring = null; try { ProcessBuilder builder = new ProcessBuilder(); builder.command(commend); //將標準輸入流和錯誤輸入流合并,通過標準輸入流程讀取信息 builder.redirectErrorStream(true); Process p = builder.start(); outstring = waitFor(p); } catch (Exception ex) { ex.printStackTrace(); } Boolean check_video_time = this.check_video_time(video_path, mp4folder_path + mp4_name); if(!check_video_time){ return outstring; }else{ return "success"; } } public static void main(String[] args) throws IOException { //ffmpeg的路徑 String ffmpeg_path = "F:\\environment\\ffmpeg-7.0.2-full_build\\bin\\ffmpeg.exe";//ffmpeg的安裝位置 //源avi視頻的路徑 String video_path = "E:\\Users\\31118\\Videos\\1.avi"; //轉換后mp4文件的名稱 String mp4_name = "1.mp4"; //轉換后mp4文件的路徑 String mp4_path = "E:\\Users\\31118\\Videos\\"; //結尾路徑,需要加上\\ //創(chuàng)建工具類對象 Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path); //開始視頻轉換,成功將返回success String s = videoUtil.generateMp4(); System.out.println(s); } }
任務補償機制
如果有線程搶占了某個視頻的處理任務,如果線程處理過程中掛掉了,該視頻的狀態(tài)將會一直是處理中,其它線程將無法處理,這個問題需要用補償機制。
單獨啟動一個任務找到待處理任務表中超過執(zhí)行期限但仍在處理中的任務,將任務的狀態(tài)改為執(zhí)行失敗。
任務執(zhí)行期限是處理一個視頻的最大時間,比如定為30分鐘,通過任務的啟動時間去判斷任務是否超過執(zhí)行期限。
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
利用Jmeter發(fā)送Java請求的實戰(zhàn)記錄
JMeter是Apache組織的開放源代碼項目,它是功能和性能測試的工具,100%的用java實現,下面這篇文章主要給大家介紹了關于如何利用Jmeter發(fā)送Java請求的相關資料,需要的朋友可以參考下2021-09-09mybatis打印的sql日志不寫入到log文件的問題及解決
這篇文章主要介紹了mybatis打印的sql日志不寫入到log文件的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08Spring系統(tǒng)屬性及spring.properties配置文件示例詳解
spring中有一個SpringProperties類,來保存spring的系統(tǒng)屬性,本文結合實例代碼對Spring系統(tǒng)屬性及spring.properties配置文件相關知識給大家介紹的非常詳細,需要的朋友參考下吧2023-07-07Idea中如何查看SpringSecurity各Filter信息
這篇文章主要介紹了Idea中如何查看SpringSecurity各Filter信息,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01