Java之Rsync并發(fā)遷移數(shù)據(jù)并校驗詳解
java調(diào)用Rsync并發(fā)遷移數(shù)據(jù)并執(zhí)行校驗
java代碼如下
RsyncFile.java
import lombok.NoArgsConstructor; import lombok.SneakyThrows; import java.io.*; import java.util.ArrayList; import java.util.Date; import java.util.concurrent.*; /** * @ClassName RsyncFile * @Descriptiom TODO rsync多線程同步遷移數(shù)據(jù) * @Author KING * @Date 2019/11/25 09:17 * @Version 1.2.2 * rsync -vzrtopg --progress --delete //鏡像同步 **/ @NoArgsConstructor public class RsyncFile implements Runnable{ private static final int availProcessors = Runtime.getRuntime().availableProcessors(); //構造以cpu核心數(shù)為核心池,cpu線程數(shù)為最大池,超時時間為1s,線程隊列為大小為無界的安全阻塞線程隊列,拒絕策略為DiscardOldestPolicy()的線程池。(同步數(shù)據(jù)當然不能丟下拒絕任務) private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1, availProcessors, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); //保存掃描得到的文件列表 private static ArrayList<String> fileNameList = new ArrayList<String>(); private String shellname; private String filename; private String userip; private CountDownLatch countDownLatch; private static int deep = 0; public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) { this.shellname = ShellName; this.filename = filename; this.userip = UserIP; this.countDownLatch = countDownLatch; } public static void main(String[] args) { try { new RsyncFile().Do(args[0],args[1],Integer.parseInt(args[2])); }catch (ArrayIndexOutOfBoundsException e){ System.out.println(e); System.out.println("Error , args send fault"); System.out.println("please send localAddress remote username @ remote IP or hostname and catalogue"); System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]"); }catch(NumberFormatException e1){ System.out.println(e1); System.out.println("please input Right Directory depth, this number must be int"); System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]"); } } @SneakyThrows private void Do(String content,String UserIP,int setdeep){ System.out.println("開始執(zhí)行"); System.out.println("開始時間:" + new Date()); Long a = System.nanoTime(); File file = new File(content); System.out.println("開始掃描本地指定目錄"); GetAllFile(file,setdeep);//按深度掃描非空文件夾和文件 System.out.println("掃描本地目錄完成"); //給腳本賦予權限 String [] cmd={"/bin/sh","-c","chmod 755 ./do*"}; Runtime.getRuntime().exec(cmd); //創(chuàng)建遠端目錄操作 System.out.println("開始創(chuàng)建遠端目錄結構"); //一次計數(shù)鎖用于保證目錄創(chuàng)建完成 CountDownLatch doDirLock = new CountDownLatch(1); ThreadPool.execute(new RsyncFile("./doDirc.sh",content,UserIP,doDirLock)); doDirLock.await(); System.out.println("創(chuàng)建遠端目錄結構完成"); //開始同步工作 System.out.println("開始執(zhí)行同步工作"); System.out.println("同步的文件夾或文件總數(shù): " + fileNameList.size()); System.out.println("正在同步。。。。。"); //fileNameList.size()次計數(shù)鎖用于保證數(shù)據(jù)同步完成(保證計時準確) CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size()); System.out.println(fileNameList.size()); for (String fileName:fileNameList) { //除去文件名中與UserIP重復的文件路徑 String RemoteDir = UserIP.concat(fileName.replace(content, "")); System.out.println("要同步的本地目錄或文件: " + fileName); System.out.println("要同步的遠端目錄或文件: " + RemoteDir); ThreadPool.execute(new RsyncFile("./doRync.sh",fileName, RemoteDir,rsyncLock)); } rsyncLock.await(); System.out.println("執(zhí)行同步工作完成"); //開始文件校驗工作 System.out.println("執(zhí)行文件校驗及重傳"); //一次計數(shù)鎖用于保證校驗完成 CountDownLatch chechSumLock = new CountDownLatch(1); ThreadPool.execute(new RsyncFile("./doChecksum.sh",content,UserIP,chechSumLock)); chechSumLock.await(); System.out.println("文件校驗及重傳完成"); ThreadPool.shutdown(); Long b = System.nanoTime(); Long aLong = (b - a)/1000000L; System.out.println("處理時間" + aLong + "ms"); System.out.println("結束時間:" + new Date()); } /** * 執(zhí)行rsync腳本的線程方法,使用PrintWriter來與linux Terminal交互 */ @Override public void run() { try { String command=shellname.concat(" ").concat(filename).concat(" ").concat(userip); File wd = new File("/bin"); Process process = null; process = Runtime.getRuntime().exec("/bin/bash", null, wd); if (process != null) { InputStream is = process.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())), true); //切換到當前class文件所在目錄 out.println("cd " + System.getProperty("user.dir")); out.println(command); out.println("exit"); StringBuilder sb = new StringBuilder(); String line; while ((line = reader.readLine()) != null) { sb.append(line + System.lineSeparator()); } process.waitFor(); reader.close(); out.close(); process.destroy(); System.out.println("result:" + sb.toString()); }else { System.out.println("找不到系統(tǒng)bash工具,請檢查系統(tǒng)是否異常,并為系統(tǒng)創(chuàng)建/bin/sh的bash工具軟連接"); } } catch (Exception e) { System.err.println(e.getMessage()); }finally { //倒記數(shù)鎖釋放一次 countDownLatch.countDown(); } } /**遍歷指定的目錄并能指定深度 * @param file 指定要遍歷的目錄 * @param setDeep 設定遍歷深度 */ @SneakyThrows private static void GetAllFile(File file, int setDeep) { if(file != null){ if(file.isDirectory() && deep<setDeep){ deep++; File f[] = file.listFiles(); if(f != null) { int length = f.length; for(int i = 0; i < length; i++) GetAllFile(f[i],setDeep); deep--; } } else { if(file.isDirectory()){ //如果為目錄末尾添加 / 保證rsync正常處理 fileNameList.add(file.getAbsolutePath().concat("/")); }else { fileNameList.add(file.getAbsolutePath()); } } } } }
doDir.sh
rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1 echo "創(chuàng)建目錄結構操作"
doRsync.sh
rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1
doChecksum.sh
rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1
附錄
rsync輸出日志說明如下
YXcstpoguax path/to/file ||||||||||| ||||||||||╰- x: The extended attribute information changed |||||||||╰-- a: The ACL information changed ||||||||╰--- u: The u slot is reserved for future use |||||||╰---- g: Group is different ||||||╰----- o: Owner is different |||||╰------ p: Permission are different ||||╰------- t: Modification time is different |||╰-------- s: Size is different ||╰--------- c: Different checksum (for regular files), or || changed value (for symlinks, devices, and special files) |╰---------- the file type: | f: for a file, | d: for a directory, | L: for a symlink, | D: for a device, | S: for a special file (e.g. named sockets and fifos) ╰----------- the type of update being done:: <: file is being transferred to the remote host (sent) >: file is being transferred to the local host (received) c: local change/creation for the item, such as: - the creation of a directory - the changing of a symlink, - etc. h: the item is a hard link to another item (requires --hard-links). .: the item is not being updated (though it might have attributes that are being modified) *: means that the rest of the itemized-output area contains a message (e.g. "deleting")
到此這篇關于Java之Rsync并發(fā)遷移數(shù)據(jù)并校驗詳解的文章就介紹到這了,更多相關Java之Rsync并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- Java并發(fā)編程之代碼實現(xiàn)兩玩家交換裝備
- Java并發(fā)編程之阻塞隊列(BlockingQueue)詳解
- java實戰(zhàn)案例之用戶注冊并發(fā)送郵件激活/發(fā)送郵件驗證碼
- JAVA并發(fā)圖解
- java并發(fā)編程JUC CountDownLatch線程同步
- Java并發(fā)之Condition案例詳解
- java并發(fā)編程之ThreadLocal詳解
- Java 處理高并發(fā)負載類優(yōu)化方法案例詳解
- 淺談Java高并發(fā)解決方案以及高負載優(yōu)化方法
- Java httpClient連接池支持多線程高并發(fā)的實現(xiàn)
- Java中常見的并發(fā)控制手段淺析
- Java面試題沖刺第二十四天--并發(fā)編程
- Java 模擬真正的并發(fā)請求詳情
相關文章
SpringCloud-Gateway網(wǎng)關的使用實例教程
Gateway網(wǎng)關在微服務架構中扮演了不可或缺的角色,通過集中化管理、智能路由和強大的過濾器機制,為構建高效、可擴展的微服務系統(tǒng)提供了有力支持,這篇文章主要介紹了SpringCloud-Gateway網(wǎng)關的使用,需要的朋友可以參考下2024-03-03Java報錯:ClassCastException問題解決方法
異常是程序中的一些錯誤,但并不是所有的錯誤都是異常,并且錯誤有時候是可以避免的,下面這篇文章主要給大家介紹了關于Java報錯:ClassCastException問題解決方法,需要的朋友可以參考下2024-07-07springboot 多數(shù)據(jù)源的實現(xiàn)(最簡單的整合方式)
這篇文章主要介紹了springboot 多數(shù)據(jù)源的實現(xiàn)(最簡單的整合方式),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-11-11基于java swing實現(xiàn)答題系統(tǒng)
這篇文章主要為大家詳細介紹了基于java swing實現(xiàn)答題系統(tǒng),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01IntelliJ IDEA中新建Java class的解決方案
今天小編就為大家分享一篇關于IntelliJ IDEA中新建Java class的解決方案,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10詳解關于IntelliJ IDEA中Schedule for Addition 的問題
本篇文章主要介紹了詳解關于 IntelliJ IDEA 中 Schedule for Addition 的問題,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12