關(guān)于JSCH使用自定義連接池的說明
1. JSCH使用方法
2. JSCH工具類
3. 創(chuàng)建連接池
ConnectionPool.java
@Slf4j public class ConnectionPool { private String strictHostKeyChecking; private Integer timeout; /** * ip地址 */ private String ip = ""; /** * 端口號 */ private Integer port = 22; /** * 用戶名 */ private String username = ""; /** * 密碼 */ private String password = ""; /** * 每次擴(kuò)容增加幾個(gè)連接 */ private int incrementalConnections = 2; /** * 最大連接數(shù) */ private int maxConnections = 10; /** * 最大空閑連接 */ private int maxIdle = 4; /** * 最小空閑連接 */ private int minIdel = 2; private Vector<PooledConnection> connections = null; @PostConstruct private void init() { createPool(); } /** * 構(gòu)造方法 * * @param strictHostKeyChecking 連接模式 * @param timeout 超時(shí)時(shí)間 */ public ConnectionPool(String strictHostKeyChecking, Integer timeout) { this.strictHostKeyChecking = strictHostKeyChecking; this.timeout = timeout; } /** * 構(gòu)造方法 * * @param strictHostKeyChecking 連接模式 * @param timeout 超時(shí)時(shí)間 * @param incrementalConnections 增量大小 */ public ConnectionPool(String strictHostKeyChecking, Integer timeout, int incrementalConnections) { this.strictHostKeyChecking = strictHostKeyChecking; this.timeout = timeout; this.incrementalConnections = incrementalConnections; } /** * 構(gòu)造方法 * * @param strictHostKeyChecking 連接模式 * @param timeout 超時(shí)時(shí)間 * @param incrementalConnections 增量大小 * @param maxConnections 連接池最大連接數(shù) */ public ConnectionPool(String strictHostKeyChecking, Integer timeout, int incrementalConnections, int maxConnections) { this.strictHostKeyChecking = strictHostKeyChecking; this.timeout = timeout; this.incrementalConnections = incrementalConnections; this.maxConnections = maxConnections; } /** * 創(chuàng)建連接池,判斷連接池是否創(chuàng)建,如果連接池沒有創(chuàng)建則創(chuàng)建連接池 */ public synchronized void createPool() { if (Objects.nonNull(connections)) { return; } connections = new Vector<>(); log.info("create shell connectionPool success!"); } /** * 創(chuàng)建指定數(shù)量的連接放入連接池中 * * @param numConnections 創(chuàng)建數(shù)量 * @throws JSchException 建立遠(yuǎn)程連接異常 */ private void createConnections(int numConnections) throws JSchException { for (int x = 0; x < numConnections; x++) { // 判斷是否已達(dá)連接池最大連接,如果到達(dá)最大連接數(shù)據(jù)則不再創(chuàng)建連接 if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) { break; } //在連接池中新增一個(gè)連接 try { connections.addElement(new PooledConnection(newConnection(), ip)); } catch (JSchException e) { log.error("create shell connection failed {}", e.getMessage()); throw new JSchException(); } log.info("Session connected!"); } } /** * 新一個(gè)連接session * * @return 創(chuàng)建的session * @throws JSchException 遠(yuǎn)程連接異常 */ private Session newConnection() throws JSchException { // 創(chuàng)建一個(gè)session JSch jsch = new JSch(); Session session = jsch.getSession(username, ip, port); session.setPassword(password); Properties sshConfig = new Properties(); sshConfig.put("StrictHostKeyChecking", strictHostKeyChecking); session.setConfig(sshConfig); session.connect(timeout); session.setServerAliveInterval(1800); return session; } /** * 獲取一個(gè)可用session * * @param ip ip地址 * @param port 端口號 * @param username 用戶名 * @param password 密碼 * @return 可用的session * @throws JSchException 遠(yuǎn)程連接異常 */ public synchronized Session getConnection(String ip, Integer port, String username, String password) throws JSchException { this.ip = ip; this.port = port; this.username = username; this.password = password; // 連接池還沒創(chuàng)建,則返回 null if (Objects.isNull(connections)) { return null; } // 獲得一個(gè)可用的數(shù)據(jù)庫連接 Session session = getFreeConnection(); // 假如目前沒有可以使用的連接,即所有的連接都在使用中,等一會重試 while (Objects.isNull(session)) { wait(250); session = getFreeConnection(); } return session; } /** * 獲取一個(gè)可用session * * @return 返回可用session * @throws JSchException 遠(yuǎn)程連接異常 */ private Session getFreeConnection() throws JSchException { Session session = findFreeConnection(); // 如果沒有可用連接,則創(chuàng)建連接, if (Objects.isNull(session)) { createConnections(incrementalConnections); session = findFreeConnection(); if (Objects.isNull(session)) { return null; } } return session; } /** * 查找可用連接 * * @return 返回可用連接 */ private Session findFreeConnection() { Session session = null; PooledConnection conn; Enumeration<PooledConnection> enumerate = connections.elements(); // 遍歷所有的對象,看是否有可用的連接 while (enumerate.hasMoreElements()) { conn = enumerate.nextElement(); if (!ip.equals(conn.getTag())) { continue; } if (!conn.isBusy()) { session = conn.getSession(); conn.setBusy(true); if (!testConnection(session)) { try { session = newConnection(); } catch (JSchException e) { log.error("create shell connection failed {}", e.getMessage()); return null; } conn.setSession(session); } break; } } return session; } /** * 測試連接是否可用 * * @param session 需要測試的session * @return 是否可用 */ private boolean testConnection(Session session) { boolean connected = session.isConnected(); if (!connected) { closeConnection(session); return false; } return true; } /** * 將session放回連接池中 * * @param session 需要放回連接池中的session */ public synchronized void returnConnection(Session session) { // 確保連接池存在,假如連接沒有創(chuàng)建(不存在),直接返回 if (Objects.isNull(connections)) { log.error("ConnectionPool does not exist"); return; } PooledConnection conn; Enumeration<PooledConnection> enumerate = connections.elements(); // 遍歷連接池中的所有連接,找到這個(gè)要返回的連接對象,將狀態(tài)設(shè)置為空閑 while (enumerate.hasMoreElements()) { conn = enumerate.nextElement(); if (session.equals(conn.getSession())) { conn.setBusy(false); } } } /** * 刷新連接池 * * @throws JSchException 遠(yuǎn)程連接異常 */ public synchronized void refreshConnections() throws JSchException { // 確保連接池己創(chuàng)新存在 if (Objects.isNull(connections)) { log.error("ConnectionPool does not exist"); return; } PooledConnection conn; Enumeration<PooledConnection> enumerate = connections.elements(); while (enumerate.hasMoreElements()) { conn = enumerate.nextElement(); if (conn.isBusy()) { wait(5000); } closeConnection(conn.getSession()); conn.setSession(newConnection()); conn.setBusy(false); } } /** * 關(guān)閉連接池 */ public synchronized void closeConnectionPool() { // 確保連接池存在,假如不存在,返回 if (Objects.isNull(connections)) { log.info("Connection pool does not exist"); return; } PooledConnection conn; Enumeration<PooledConnection> enumerate = connections.elements(); while (enumerate.hasMoreElements()) { conn = enumerate.nextElement(); if (conn.isBusy()) { wait(5000); } closeConnection(conn.getSession()); connections.removeElement(conn); } connections = null; } /** * 關(guān)閉session會話 * * @param session 需要關(guān)閉的session */ private void closeConnection(Session session) { session.disconnect(); } /** * 線程暫停 * * @param mSeconds 暫停時(shí)間 */ private void wait(int mSeconds) { try { Thread.sleep(mSeconds); } catch (InterruptedException e) { log.error("{} 線程暫停失敗 -> {}", Thread.currentThread().getName(), e.getMessage()); } } /** * 對象連接狀態(tài) */ class PooledConnection { /** * 遠(yuǎn)程連接 */ Session session; /** * 此連接是否正在使用的標(biāo)志,默認(rèn)沒有正在使用 */ boolean busy = false; /** * 連接標(biāo)記 */ String tag; /** * 構(gòu)造函數(shù),根據(jù)一個(gè) Session 構(gòu)造一個(gè) PooledConnection 對象 * * @param session 連接 * @param tag 連接標(biāo)記 */ public PooledConnection(Session session, String tag) { this.session = session; this.tag = tag; } public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } public boolean isBusy() { return busy; } public void setBusy(boolean busy) { this.busy = busy; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } } public int getIncrementalConnections() { return this.incrementalConnections; } public void setIncrementalConnections(int incrementalConnections) { this.incrementalConnections = incrementalConnections; } public int getMaxConnections() { return this.maxConnections; } public void setMaxConnections(int maxConnections) { this.maxConnections = maxConnections; } }
4. 改造shellUtil
ShellUtil.java
@Slf4j @Component @Scope(value = "prototype") public class ShellUtil { /** * ip地址 */ private String ip = ""; /** * 端口號 */ private Integer port = 22; /** * 用戶名 */ private String username = ""; /** * 密碼 */ private String password = ""; private Session session; private Channel channel; private ChannelExec channelExec; private ChannelSftp channelSftp; private ChannelShell channelShell; private ConnectionPool connectionPool; public ShellUtil(ConnectionPool connectionPool) { this.connectionPool = connectionPool; } /** * 初始化 * * @param ip 遠(yuǎn)程主機(jī)IP地址 * @param port 遠(yuǎn)程主機(jī)端口 * @param username 遠(yuǎn)程主機(jī)登陸用戶名 * @param password 遠(yuǎn)程主機(jī)登陸密碼 * @throws JSchException JSch異常 */ public void init(String ip, Integer port, String username, String password) throws JSchException { this.ip = ip; this.port = port; this.username = username; this.password = password; } public void init(String ip, String username, String password) throws JSchException { this.ip = ip; this.username = username; this.password = password; } private void getSession() throws JSchException { session = connectionPool.getConnection(ip, port, username, password); if (Objects.isNull(session)) { connectionPool.refreshConnections(); session = connectionPool.getConnection(ip, port, username, password); if (Objects.isNull(session)){ throw new RuntimeException("無可用連接"); } } } /** * 連接多次執(zhí)行命令,執(zhí)行命令完畢后需要執(zhí)行close()方法 * * @param command 需要執(zhí)行的指令 * @return 執(zhí)行結(jié)果 * @throws Exception 沒有執(zhí)行初始化 */ public String execCmd(String command) throws Exception { initChannelExec(); log.info("execCmd command - > {}", command); channelExec.setCommand(command); channel.setInputStream(null); channelExec.setErrStream(System.err); channel.connect(); StringBuilder sb = new StringBuilder(16); try (InputStream in = channelExec.getInputStream(); InputStreamReader isr = new InputStreamReader(in, StandardCharsets.UTF_8); BufferedReader reader = new BufferedReader(isr)) { String buffer; while ((buffer = reader.readLine()) != null) { sb.append("\n").append(buffer); } log.info("execCmd result - > {}", sb); return sb.toString(); } } /** * 執(zhí)行命令關(guān)閉連接 * * @param command 需要執(zhí)行的指令 * @return 執(zhí)行結(jié)果 * @throws Exception 沒有執(zhí)行初始化 */ public String execCmdAndClose(String command) throws Exception { String result = execCmd(command); close(); return result; } /** * 執(zhí)行復(fù)雜shell命令 * * @param cmds 多條命令 * @return 執(zhí)行結(jié)果 * @throws Exception 連接異常 */ public String execCmdByShell(String... cmds) throws Exception { return execCmdByShell(Arrays.asList(cmds)); } /** * 執(zhí)行復(fù)雜shell命令 * * @param cmds 多條命令 * @return 執(zhí)行結(jié)果 * @throws Exception 連接異常 */ public String execCmdByShell(List<String> cmds) throws Exception { String result = ""; initChannelShell(); InputStream inputStream = channelShell.getInputStream(); channelShell.setPty(true); channelShell.connect(); OutputStream outputStream = channelShell.getOutputStream(); PrintWriter printWriter = new PrintWriter(outputStream); for (String cmd : cmds) { printWriter.println(cmd); } printWriter.flush(); byte[] tmp = new byte[1024]; while (true) { while (inputStream.available() > 0) { int i = inputStream.read(tmp, 0, 1024); if (i < 0) { break; } String s = new String(tmp, 0, i); if (s.contains("--More--")) { outputStream.write((" ").getBytes()); outputStream.flush(); } System.out.println(s); } if (channelShell.isClosed()) { System.out.println("exit-status:" + channelShell.getExitStatus()); break; } try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } outputStream.close(); inputStream.close(); close(); return result; } /** * SFTP文件上傳 * * @param src 源地址 * @param dst 目的地址 * @throws Exception 上傳文件失敗 */ public void put(String src, String dst) throws Exception { put(src, dst, ChannelSftp.OVERWRITE); } /** * SFTP文件上傳 * * @param src 源地址 * @param dst 目的地址 * @param mode 上傳模式 默認(rèn)為ChannelSftp.OVERWRITE * @throws Exception 上傳文件失敗 */ public void put(String src, String dst, int mode) throws Exception { initChannelSftp(); log.info("Upload File {} -> {}", src, dst); channelSftp.put(src, dst, mode); log.info("Upload File Success!"); close(); } /** * SFTP文件上傳并監(jiān)控上傳進(jìn)度 * * @param src 源地址 * @param dst 目的地址 * @throws Exception 上傳文件失敗 */ public void putMonitorAndClose(String src, String dst) throws Exception { putMonitorAndClose(src, dst, ChannelSftp.OVERWRITE); } /** * SFTP文件上傳并監(jiān)控上傳進(jìn)度 * * @param src 源地址 * @param dst 目的地址 * @param mode 上傳模式 默認(rèn)為ChannelSftp.OVERWRITE * @throws Exception 上傳文件失敗 */ public void putMonitorAndClose(String src, String dst, int mode) throws Exception { initChannelSftp(); FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length()); log.info("Upload File {} -> {}", src, dst); channelSftp.put(src, dst, monitor, mode); log.info("Upload File Success!"); close(); } /** * SFTP文件下載 * * @param src 源文件地址 * @param dst 目的地址 * @throws Exception 下載文件失敗 */ public void get(String src, String dst) throws Exception { initChannelSftp(); log.info("Download File {} -> {}", src, dst); channelSftp.get(src, dst); log.info("Download File Success!"); close(); } /** * SFTP文件下載并監(jiān)控下載進(jìn)度 * * @param src 源文件地址 * @param dst 目的地址 * @throws Exception 下載文件失敗 */ public void getMonitorAndClose(String src, String dst) throws Exception { initChannelSftp(); FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length()); log.info("Download File {} -> {}", src, dst); channelSftp.get(src, dst, monitor); log.info("Download File Success!"); close(); } /** * 刪除指定目錄文件 * * @param path 刪除路徑 * @throws Exception 遠(yuǎn)程主機(jī)連接異常 */ public void deleteFile(String path) throws Exception { initChannelSftp(); channelSftp.rm(path); log.info("Delete File {}", path); close(); } /** * 刪除指定目錄 * * @param path 刪除路徑 * @throws Exception 遠(yuǎn)程主機(jī)連接異常 */ public void deleteDir(String path) throws Exception { initChannelSftp(); channelSftp.rmdir(path); log.info("Delete Dir {} ", path); close(); } /** * 釋放資源 */ public void close() { connectionPool.returnConnection(session); } private void initChannelSftp() throws Exception { getSession(); channel = session.openChannel("sftp"); channel.connect(); // 建立SFTP通道的連接 channelSftp = (ChannelSftp) channel; if (session == null || channel == null || channelSftp == null) { log.error("請先執(zhí)行init()"); throw new Exception("請先執(zhí)行init()"); } } private void initChannelExec() throws Exception { getSession(); // 打開執(zhí)行shell指令的通道 channel = session.openChannel("exec"); channelExec = (ChannelExec) channel; if (session == null || channel == null || channelExec == null) { log.error("請先執(zhí)行init()"); throw new Exception("請先執(zhí)行init()"); } } private void initChannelShell() throws Exception { getSession(); // 打開執(zhí)行shell指令的通道 channel = session.openChannel("shell"); channelShell = (ChannelShell) channel; if (session == null || channel == null || channelShell == null) { log.error("請先執(zhí)行init()"); throw new Exception("請先執(zhí)行init()"); } } }
5. 添加配置
ConnectionPoolConfig.java
@Configuration public class PoolConfiguration { @Value("${ssh.strictHostKeyChecking:no}") private String strictHostKeyChecking; @Value("${ssh.timeout:30000}") private Integer timeout; @Value("${ssh.incrementalConnections:2}") private Integer incrementalConnections; @Value("${ssh.maxConnections:10}") private Integer maxConnections; @Bean public ConnectionPool connectionPool(){ return new ConnectionPool(strictHostKeyChecking, timeout,incrementalConnections,maxConnections); } }
6. 線程安全問題解決
6.1
public class SessionThreadLocal { private static ThreadLocal<Session> threadLocal = new ThreadLocal<>(); public static synchronized void set(Session session) { threadLocal.set(session); } public static synchronized Session get( ) { return threadLocal.get(); } public static synchronized void remove( ) { threadLocal.remove(); } }
6.2 使用springboot中bean的作用域prototype
使用@Lookup注入方式
@Lookup public ShellUtil getshellUtil(){ return null; }; @GetMapping("/test") public void Test() throws Exception { int i = getshellUtil().hashCode(); System.out.println(i); }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java Stream 流實(shí)現(xiàn)合并操作示例
這篇文章主要介紹了Java Stream 流實(shí)現(xiàn)合并操作,結(jié)合實(shí)例形式詳細(xì)分析了Java Stream 流實(shí)現(xiàn)合并操作原理與相關(guān)注意事項(xiàng),需要的朋友可以參考下2020-05-05SpringBoot服務(wù)設(shè)置禁止server.point端口的使用
本文主要介紹了SpringBoot服務(wù)設(shè)置禁止server.point端口的使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-01-01java 注解實(shí)現(xiàn)一個(gè)可配置線程池的方法示例
這篇文章主要介紹了java 注解實(shí)現(xiàn)一個(gè)可配置線程池的方法示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-01-01詳解Spring DeferredResult異步操作使用場景
本文主要介紹了Spring DeferredResult異步操作使用場景,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10SpringBoot中l(wèi)ogback日志保存到mongoDB的方法
這篇文章主要介紹了SpringBoot中l(wèi)ogback日志保存到mongoDB的方法,2017-11-11Spring框架web項(xiàng)目實(shí)戰(zhàn)全代碼分享
這篇文章主要介紹了Spring框架web項(xiàng)目實(shí)戰(zhàn)全代碼分享,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11