Java之Rsync并發(fā)遷移數(shù)據(jù)并校驗詳解
java調(diào)用Rsync并發(fā)遷移數(shù)據(jù)并執(zhí)行校驗
java代碼如下
RsyncFile.java
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import java.io.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
/**
* @ClassName RsyncFile
* @Descriptiom TODO rsync多線程同步遷移數(shù)據(jù)
* @Author KING
* @Date 2019/11/25 09:17
* @Version 1.2.2
* rsync -vzrtopg --progress --delete //鏡像同步
**/
@NoArgsConstructor
public class RsyncFile implements Runnable{
private static final int availProcessors = Runtime.getRuntime().availableProcessors();
//構(gòu)造以cpu核心數(shù)為核心池,cpu線程數(shù)為最大池,超時時間為1s,線程隊列為大小為無界的安全阻塞線程隊列,拒絕策略為DiscardOldestPolicy()的線程池。(同步數(shù)據(jù)當(dāng)然不能丟下拒絕任務(wù))
private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1,
availProcessors,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
//保存掃描得到的文件列表
private static ArrayList<String> fileNameList = new ArrayList<String>();
private String shellname;
private String filename;
private String userip;
private CountDownLatch countDownLatch;
private static int deep = 0;
public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) {
this.shellname = ShellName;
this.filename = filename;
this.userip = UserIP;
this.countDownLatch = countDownLatch;
}
public static void main(String[] args) {
try {
new RsyncFile().Do(args[0],args[1],Integer.parseInt(args[2]));
}catch (ArrayIndexOutOfBoundsException e){
System.out.println(e);
System.out.println("Error , args send fault");
System.out.println("please send localAddress remote username @ remote IP or hostname and catalogue");
System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]");
}catch(NumberFormatException e1){
System.out.println(e1);
System.out.println("please input Right Directory depth, this number must be int");
System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]");
}
}
@SneakyThrows
private void Do(String content,String UserIP,int setdeep){
System.out.println("開始執(zhí)行");
System.out.println("開始時間:" + new Date());
Long a = System.nanoTime();
File file = new File(content);
System.out.println("開始掃描本地指定目錄");
GetAllFile(file,setdeep);//按深度掃描非空文件夾和文件
System.out.println("掃描本地目錄完成");
//給腳本賦予權(quán)限
String [] cmd={"/bin/sh","-c","chmod 755 ./do*"};
Runtime.getRuntime().exec(cmd);
//創(chuàng)建遠端目錄操作
System.out.println("開始創(chuàng)建遠端目錄結(jié)構(gòu)");
//一次計數(shù)鎖用于保證目錄創(chuàng)建完成
CountDownLatch doDirLock = new CountDownLatch(1);
ThreadPool.execute(new RsyncFile("./doDirc.sh",content,UserIP,doDirLock));
doDirLock.await();
System.out.println("創(chuàng)建遠端目錄結(jié)構(gòu)完成");
//開始同步工作
System.out.println("開始執(zhí)行同步工作");
System.out.println("同步的文件夾或文件總數(shù): " + fileNameList.size());
System.out.println("正在同步。。。。。");
//fileNameList.size()次計數(shù)鎖用于保證數(shù)據(jù)同步完成(保證計時準(zhǔn)確)
CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size());
System.out.println(fileNameList.size());
for (String fileName:fileNameList) {
//除去文件名中與UserIP重復(fù)的文件路徑
String RemoteDir = UserIP.concat(fileName.replace(content, ""));
System.out.println("要同步的本地目錄或文件: " + fileName);
System.out.println("要同步的遠端目錄或文件: " + RemoteDir);
ThreadPool.execute(new RsyncFile("./doRync.sh",fileName, RemoteDir,rsyncLock));
}
rsyncLock.await();
System.out.println("執(zhí)行同步工作完成");
//開始文件校驗工作
System.out.println("執(zhí)行文件校驗及重傳");
//一次計數(shù)鎖用于保證校驗完成
CountDownLatch chechSumLock = new CountDownLatch(1);
ThreadPool.execute(new RsyncFile("./doChecksum.sh",content,UserIP,chechSumLock));
chechSumLock.await();
System.out.println("文件校驗及重傳完成");
ThreadPool.shutdown();
Long b = System.nanoTime();
Long aLong = (b - a)/1000000L;
System.out.println("處理時間" + aLong + "ms");
System.out.println("結(jié)束時間:" + new Date());
}
/**
* 執(zhí)行rsync腳本的線程方法,使用PrintWriter來與linux Terminal交互
*/
@Override
public void run() {
try {
String command=shellname.concat(" ").concat(filename).concat(" ").concat(userip);
File wd = new File("/bin");
Process process = null;
process = Runtime.getRuntime().exec("/bin/bash", null, wd);
if (process != null) {
InputStream is = process.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())),
true);
//切換到當(dāng)前class文件所在目錄
out.println("cd " + System.getProperty("user.dir"));
out.println(command);
out.println("exit");
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line + System.lineSeparator());
}
process.waitFor();
reader.close();
out.close();
process.destroy();
System.out.println("result:" + sb.toString());
}else {
System.out.println("找不到系統(tǒng)bash工具,請檢查系統(tǒng)是否異常,并為系統(tǒng)創(chuàng)建/bin/sh的bash工具軟連接");
}
} catch (Exception e) {
System.err.println(e.getMessage());
}finally {
//倒記數(shù)鎖釋放一次
countDownLatch.countDown();
}
}
/**遍歷指定的目錄并能指定深度
* @param file 指定要遍歷的目錄
* @param setDeep 設(shè)定遍歷深度
*/
@SneakyThrows
private static void GetAllFile(File file, int setDeep) {
if(file != null){
if(file.isDirectory() && deep<setDeep){
deep++;
File f[] = file.listFiles();
if(f != null) {
int length = f.length;
for(int i = 0; i < length; i++)
GetAllFile(f[i],setDeep);
deep--;
}
} else {
if(file.isDirectory()){
//如果為目錄末尾添加 / 保證rsync正常處理
fileNameList.add(file.getAbsolutePath().concat("/"));
}else {
fileNameList.add(file.getAbsolutePath());
}
}
}
}
}
doDir.sh
rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1 echo "創(chuàng)建目錄結(jié)構(gòu)操作"
doRsync.sh
rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1
doChecksum.sh
rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1
附錄
rsync輸出日志說明如下
YXcstpoguax path/to/file
|||||||||||
||||||||||╰- x: The extended attribute information changed
|||||||||╰-- a: The ACL information changed
||||||||╰--- u: The u slot is reserved for future use
|||||||╰---- g: Group is different
||||||╰----- o: Owner is different
|||||╰------ p: Permission are different
||||╰------- t: Modification time is different
|||╰-------- s: Size is different
||╰--------- c: Different checksum (for regular files), or
|| changed value (for symlinks, devices, and special files)
|╰---------- the file type:
| f: for a file,
| d: for a directory,
| L: for a symlink,
| D: for a device,
| S: for a special file (e.g. named sockets and fifos)
╰----------- the type of update being done::
<: file is being transferred to the remote host (sent)
>: file is being transferred to the local host (received)
c: local change/creation for the item, such as:
- the creation of a directory
- the changing of a symlink,
- etc.
h: the item is a hard link to another item (requires
--hard-links).
.: the item is not being updated (though it might have
attributes that are being modified)
*: means that the rest of the itemized-output area contains
a message (e.g. "deleting")
到此這篇關(guān)于Java之Rsync并發(fā)遷移數(shù)據(jù)并校驗詳解的文章就介紹到這了,更多相關(guān)Java之Rsync并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java并發(fā)編程之代碼實現(xiàn)兩玩家交換裝備
- Java并發(fā)編程之阻塞隊列(BlockingQueue)詳解
- java實戰(zhàn)案例之用戶注冊并發(fā)送郵件激活/發(fā)送郵件驗證碼
- JAVA并發(fā)圖解
- java并發(fā)編程JUC CountDownLatch線程同步
- Java并發(fā)之Condition案例詳解
- java并發(fā)編程之ThreadLocal詳解
- Java 處理高并發(fā)負載類優(yōu)化方法案例詳解
- 淺談Java高并發(fā)解決方案以及高負載優(yōu)化方法
- Java httpClient連接池支持多線程高并發(fā)的實現(xiàn)
- Java中常見的并發(fā)控制手段淺析
- Java面試題沖刺第二十四天--并發(fā)編程
- Java 模擬真正的并發(fā)請求詳情
相關(guān)文章
SpringCloud-Gateway網(wǎng)關(guān)的使用實例教程
Gateway網(wǎng)關(guān)在微服務(wù)架構(gòu)中扮演了不可或缺的角色,通過集中化管理、智能路由和強大的過濾器機制,為構(gòu)建高效、可擴展的微服務(wù)系統(tǒng)提供了有力支持,這篇文章主要介紹了SpringCloud-Gateway網(wǎng)關(guān)的使用,需要的朋友可以參考下2024-03-03
Java報錯:ClassCastException問題解決方法
異常是程序中的一些錯誤,但并不是所有的錯誤都是異常,并且錯誤有時候是可以避免的,下面這篇文章主要給大家介紹了關(guān)于Java報錯:ClassCastException問題解決方法,需要的朋友可以參考下2024-07-07
Java將List轉(zhuǎn)換為String的幾種常見方式
在實際開發(fā)中經(jīng)常遇到List轉(zhuǎn)為String字符串的情況,下面這篇文章主要給大家介紹了關(guān)于Java將List轉(zhuǎn)換為String的幾種常見方式,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-03-03
springboot 多數(shù)據(jù)源的實現(xiàn)(最簡單的整合方式)
這篇文章主要介紹了springboot 多數(shù)據(jù)源的實現(xiàn)(最簡單的整合方式),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11
基于java swing實現(xiàn)答題系統(tǒng)
這篇文章主要為大家詳細介紹了基于java swing實現(xiàn)答題系統(tǒng),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01
IntelliJ IDEA中新建Java class的解決方案
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中新建Java class的解決方案,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10
詳解關(guān)于IntelliJ IDEA中Schedule for Addition 的問題
本篇文章主要介紹了詳解關(guān)于 IntelliJ IDEA 中 Schedule for Addition 的問題,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12

