Java多線程實(shí)現(xiàn)FTP批量上傳文件
本文實(shí)例為大家分享了Java多線程實(shí)現(xiàn)FTP批量上傳文件的具體代碼,供大家參考,具體內(nèi)容如下
1、構(gòu)建FTP客戶端
package cn.com.pingtech.common.ftp; import lombok.extern.slf4j.Slf4j; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPReply; import java.io.*; import java.net.UnknownHostException; @Slf4j public class ?FtpConnection { ? ? private FTPClient ftp = new FTPClient(); ? ? private boolean is_connected = false; ? ? /** ? ? ?* 構(gòu)造函數(shù) ? ? ?*/ ? ? public FtpConnection() { ? ? ? ? is_connected = false; ? ? ? ? ftp.setDefaultTimeout(FtpConfig.defaultTimeoutSecond * 1000); ? ? ? ? ftp.setConnectTimeout(FtpConfig.connectTimeoutSecond * 1000); ? ? ? ? ftp.setDataTimeout(FtpConfig.dataTimeoutSecond * 1000); ? ? ? ? try { ? ? ? ? ? ? initConnect(FtpConfig.host, FtpConfig.port, FtpConfig.user, FtpConfig.password); ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } ? ? } ? ? /** ? ? ?* 初始化連接 ? ? ?* ? ? ?* @param host ? ? ?* @param port ? ? ?* @param user ? ? ?* @param password ? ? ?* @throws IOException ? ? ?*/ ? ? private void initConnect(String host, int port, String user, String password) throws IOException { ? ? ? ? try { ? ? ? ? ? ? ftp.connect(host, port); ? ? ? ? } catch (UnknownHostException ex) { ? ? ? ? ? ? throw new IOException("Can't find FTP server '" + host + "'"); ? ? ? ? } ? ? ? ? int reply = ftp.getReplyCode();//220 連接成功 ? ? ? ? if (!FTPReply.isPositiveCompletion(reply)) { ? ? ? ? ? ? disconnect(); ? ? ? ? ? ? throw new IOException("Can't connect to server '" + host + "'"); ? ? ? ? } ? ? ? ? if (!ftp.login(user, password)) { ? ? ? ? ? ? is_connected = false; ? ? ? ? ? ? disconnect(); ? ? ? ? ? ? throw new IOException("Can't login to server '" + host + "'"); ? ? ? ? } else { ? ? ? ? ? ? is_connected = true; ? ? ? ? } ? ? } ? ? /** ? ? ?* 上傳文件 ? ? ?* ? ? ?* @param path ? ? ?* @param ftpFileName ? ? ?* @param localFile ? ? ?* @throws IOException ? ? ?*/ ? ? public boolean upload(String path, String ftpFileName, File localFile) throws IOException { ? ? ? ? boolean is ?= false; ? ? ? ? //檢查本地文件是否存在 ? ? ? ? if (!localFile.exists()) { ? ? ? ? ? ? throw new IOException("Can't upload '" + localFile.getAbsolutePath() + "'. This file doesn't exist."); ? ? ? ? } ? ? ? ? //設(shè)置工作路徑 ? ? ? ? setWorkingDirectory(path); ? ? ? ? //上傳 ? ? ? ? InputStream in = null; ? ? ? ? try { ? ? ? ? ? ? //被動模式 ? ? ? ? ? ? ftp.enterLocalPassiveMode(); ? ? ? ? ? ? in = new BufferedInputStream(new FileInputStream(localFile)); ? ? ? ? ? ? //保存文件 ? ? ? ? ? ? is = ftp.storeFile(ftpFileName, in); ? ? ? ? }catch (Exception e){ ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } ? ? ? ? finally { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? in.close(); ? ? ? ? ? ? } catch (IOException ex) { ? ? ? ? ? ? ? ? ex.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? return is; ? ? } ? ? /** ? ? ?* 關(guān)閉連接 ? ? ?* ? ? ?* @throws IOException ? ? ?*/ ? ? public void disconnect() throws IOException { ? ? ? ? if (ftp.isConnected()) { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ftp.logout(); ? ? ? ? ? ? ? ? ftp.disconnect(); ? ? ? ? ? ? ? ? is_connected = false; ? ? ? ? ? ? } catch (IOException ex) { ? ? ? ? ? ? ? ? ex.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? /** ? ? ?* 設(shè)置工作路徑 ? ? ?* ? ? ?* @param dir ? ? ?* @return ? ? ?*/ ? ? private boolean setWorkingDirectory(String dir) { ? ? ? ? if (!is_connected) { ? ? ? ? ? ? return false; ? ? ? ? } ? ? ? ? //如果目錄不存在創(chuàng)建目錄 ? ? ? ? try { ? ? ? ? ? ? if (createDirecroty(dir)) { ? ? ? ? ? ? ? ? return ftp.changeWorkingDirectory(dir); ? ? ? ? ? ? } ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } ? ? ? ? return false; ? ? } ? ? /** ? ? ?* 是否連接 ? ? ?* ? ? ?* @return ? ? ?*/ ? ? public boolean isConnected() { ? ? ? ? return is_connected; ? ? } ? ? /** ? ? ?* 創(chuàng)建目錄 ? ? ?* ? ? ?* @param remote ? ? ?* @return ? ? ?* @throws IOException ? ? ?*/ ? ? private boolean createDirecroty(String remote) throws IOException { ? ? ? ? boolean success = true; ? ? ? ? String directory = remote.substring(0, remote.lastIndexOf("/") + 1); ? ? ? ? // 如果遠(yuǎn)程目錄不存在,則遞歸創(chuàng)建遠(yuǎn)程服務(wù)器目錄 ? ? ? ? if (!directory.equalsIgnoreCase("/") && !ftp.changeWorkingDirectory(new String(directory))) { ? ? ? ? ? ? int start = 0; ? ? ? ? ? ? int end = 0; ? ? ? ? ? ? if (directory.startsWith("/")) { ? ? ? ? ? ? ? ? start = 1; ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? start = 0; ? ? ? ? ? ? } ? ? ? ? ? ? end = directory.indexOf("/", start); ? ? ? ? ? ? while (true) { ? ? ? ? ? ? ? ? String subDirectory = new String(remote.substring(start, end)); ? ? ? ? ? ? ? ? if (!ftp.changeWorkingDirectory(subDirectory)) { ? ? ? ? ? ? ? ? ? ? if (ftp.makeDirectory(subDirectory)) { ? ? ? ? ? ? ? ? ? ? ? ? ftp.changeWorkingDirectory(subDirectory); ? ? ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? ? ? log.error("mack directory error :/" + subDirectory); ? ? ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? start = end + 1; ? ? ? ? ? ? ? ? end = directory.indexOf("/", start); ? ? ? ? ? ? ? ? // 檢查所有目錄是否創(chuàng)建完畢 ? ? ? ? ? ? ? ? if (end <= start) { ? ? ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? return success; ? ? } }
2、FTP連接工廠
package cn.com.pingtech.common.ftp; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; /** ?* 連接工廠 ?*/ @Slf4j public class FtpFactory { ? ? //有界隊(duì)列 ? ? private static final ArrayBlockingQueue<FtpConnection> arrayBlockingQueue = new ArrayBlockingQueue<>(FtpConfig.ftpConnectionSize); ? ? protected FtpFactory(){ ? ? ? ? log.info("init ftpConnectionSize "+FtpConfig.ftpConnectionSize); ? ? ? ? for(int i = 0; i< FtpConfig.ftpConnectionSize; i++){ ? ? ? ? ? ? //表示如果可能的話,將 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容納,則返回 true,否則返回 false ? ? ? ? ? ? arrayBlockingQueue.offer(new FtpConnection()); ? ? ? ? } ? ? } ? ? /** ? ? ?* 獲取連接 ? ? ?* ? ? ?* @return ? ? ?*/ ? ? public FtpConnection getFtp() { ? ? ? ? FtpConnection poll = null; ? ? ? ? try { ? ? ? ? ? ? //取走 BlockingQueue 里排在首位的對象,若 BlockingQueue 為空,阻斷進(jìn)入等待狀態(tài)直到 Blocking 有新的對象被加入為止 ? ? ? ? ? ? poll = arrayBlockingQueue.take(); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } ? ? ? ? return poll; ? ? } ? ? /** ? ? ?* 釋放連接 ? ? ?* @param ftp ? ? ?* @return ? ? ?*/ ? ? public boolean relase(FtpConnection ftp){ ? ? ? ? return arrayBlockingQueue.offer(ftp); ? ? } ? ? /** ? ? ?* 刪除連接 ? ? ?* ? ? ?* @param ftp ? ? ?*/ ? ? public void remove(FtpConnection ftp) { ? ? ? ? arrayBlockingQueue.remove(ftp); ? ? } ? ? /** ? ? ?* 關(guān)閉連接 ? ? ?*/ ? ? public void close() { ? ? ? ? for (FtpConnection connection : arrayBlockingQueue) { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? connection.disconnect(); ? ? ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? } ? ? } }
3、FTP配置
package cn.com.pingtech.common.ftp; /** ?* ftp 配置類 ?*/ public class FtpConfig { ? ? public static int defaultTimeoutSecond = 10; ? ? public static int connectTimeoutSecond = 10; ? ? public static int dataTimeoutSecond = 10; ? ? public static String host = "127.0.0.1"; ? ? public static int port =9999; ? ? public static String user = "Administrator"; ? ? public static String password ="Yp886611"; ? ? public static int threadPoolSize = 1; ? ? public static int ftpConnectionSize = 1; ? ?? }
4、構(gòu)建多線程FTP上傳任務(wù)
package cn.com.pingtech.common.ftp; import java.io.File; import java.io.IOException; import java.util.concurrent.Callable; /** ?* 上傳任務(wù) ?*/ public class UploadTask implements Callable{ ? ? private File file; ? ? private FtpConnection ftp; ? ? private String path; ? ? private String fileName; ? ? private FtpFactory factory; ? ? public UploadTask(FtpFactory factory,FtpConnection ftp, File file, String path, String fileName){ ? ? ? ? this.factory = factory; ? ? ? ? this.ftp = ftp; ? ? ? ? this.file = file; ? ? ? ? this.path = path; ? ? ? ? this.fileName = fileName; ? ? } ? ? @Override ? ? public UploadResult call() throws Exception { ? ? ? ? UploadResult result = null; ? ? ? ? try { ? ? ? ? ? ? if (ftp == null) { ? ? ? ? ? ? ? ? result = new UploadResult(file.getAbsolutePath(), false); ? ? ? ? ? ? ? ? return result; ? ? ? ? ? ? } ? ? ? ? ? ? //如果連接未開啟 重新獲取連接 ? ? ? ? ? ? if (!ftp.isConnected()) { ? ? ? ? ? ? ? ? factory.remove(ftp); ? ? ? ? ? ? ? ? ftp = new FtpConnection(); ? ? ? ? ? ? } ? ? ? ? ? ? //開始上傳 ? ? ? ? ? ? result = new UploadResult(file.getName(), ftp.upload(path, fileName, file)); ? ? ? ? } catch (IOException ex) { ? ? ? ? ? ? result = new UploadResult(file.getName(), false); ? ? ? ? ? ? ex.printStackTrace(); ? ? ? ? } finally { ? ? ? ? ? ? factory.relase(ftp);//釋放連接 ? ? ? ? } ? ? ? ? return result; ? ? } }
package cn.com.pingtech.common.ftp; /** ?* 上傳結(jié)果 ?*/ public class UploadResult { ? ? private String fileName; //文件名稱 ? ? private boolean result; //是否上傳成功 ? ? public UploadResult(String fileName, boolean result) { ? ? ? ? this.fileName = fileName; ? ? ? ? this.result = result; ? ? } ? ? public String getFileName() { ? ? ? ? return fileName; ? ? } ? ? public void setFileName(String fileName) { ? ? ? ? this.fileName = fileName; ? ? } ? ? public boolean isResult() { ? ? ? ? return result; ? ? } ? ? public void setResult(boolean result) { ? ? ? ? this.result = result; ? ? } ? ? public String toString() { ? ? ? ? return "[fileName=" + fileName + " , result=" + result + "]"; ? ? } }
注意:實(shí)現(xiàn)Callable接口的任務(wù)線程能返回執(zhí)行結(jié)果
Callable接口支持返回執(zhí)行結(jié)果,此時(shí)需要調(diào)用FutureTask.get()方法實(shí)現(xiàn),此方法會阻塞線程直到獲取“將來”的結(jié)果,當(dāng)不調(diào)用此方法時(shí),主線程不會阻塞
5、FTP上傳工具類
package cn.com.pingtech.common.ftp; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** ?* ftp上傳工具包 ?*/ public class FtpUtil { ? ? /** ? ? ?* 上傳文件 ? ? ?* ? ? ?* @param ftpPath ? ? ?* @param listFiles ? ? ?* @return ? ? ?*/ ? ? public static synchronized List upload(String ftpPath, File[] listFiles) { ? ? ? ? //構(gòu)建線程池 ? ? ? ? ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(FtpConfig.threadPoolSize); ? ? ? ? List<Future> results = new ArrayList<>(); ? ? ? ? //創(chuàng)建n個(gè)ftp鏈接 ? ? ? ? FtpFactory factory = new FtpFactory(); ? ? ? ? for (File file : listFiles) { ? ? ? ? ? ? FtpConnection ftp = factory.getFtp();//獲取ftp con ? ? ? ? ? ? UploadTask upload = new UploadTask(factory,ftp, file, ftpPath, file.getName()); ? ? ? ? ? ? Future submit = newFixedThreadPool.submit(upload); ? ? ? ? ? ? results.add(submit); ? ? ? ? } ? ? ? ? List listResults = new ArrayList<>(); ? ? ? ? for (Future result : results) { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? //獲取線程結(jié)果 ? ? ? ? ? ? ? ? UploadResult uploadResult = (UploadResult)result.get(30, TimeUnit.MINUTES); ? ? ? ? ? ? ? ? listResults.add(uploadResult); ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? factory.close(); ? ? ? ? newFixedThreadPool.shutdown(); ? ? ? ? return listResults; ? ? } }
6、測試上傳
package cn.com.pingtech.common.ftp class Client { ? ? public static void main(String[] args) throws IOException { ? ? ? ? String loalPath = "C:\\Users\\Administrator\\Desktop\\test\\0"; ? ? ? ? String ftpPath = "/data/jcz/"; ? ? ? ? File parentFile = new File(loalPath); ? ? ? ? List <UploadResult> list = FtpUtil.upload(ftpPath,parentFile.listFiles()); ? ? ? ? for(UploadResult vo:list){ ? ? ? ? ? ? System.out.println(vo); ? ? ? ? } ? ? ? ?? ? ? } }
注意:FTP協(xié)議里面,規(guī)定文件名編碼為iso-8859-1,所以目錄名或文件名需要轉(zhuǎn)碼
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
QTabWidget標(biāo)簽實(shí)現(xiàn)雙擊關(guān)閉的方法(推薦)
這篇文章主要介紹了QTabWidget標(biāo)簽實(shí)現(xiàn)雙擊關(guān)閉的方法(推薦)的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-06-06springboot實(shí)現(xiàn)rabbitmq消息確認(rèn)的示例代碼
RabbitMQ的消息確認(rèn)有兩種, 一種是消息發(fā)送確認(rèn),第二種是消費(fèi)接收確認(rèn),本文主要介紹了springboot實(shí)現(xiàn)rabbitmq消息確認(rèn)的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09ArrayList和HashMap如何自己實(shí)現(xiàn)實(shí)例詳解
這篇文章主要介紹了 ArrayList和HashMap如何自己實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考下2016-12-12Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法
在本篇文章里小編給大家整理的是關(guān)于Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法和實(shí)例,需要的朋友們可以參考下。2020-02-02