關(guān)于JSCH使用自定義連接池的說(shuō)明
1. JSCH使用方法
2. JSCH工具類
3. 創(chuàng)建連接池
ConnectionPool.java
@Slf4j
public class ConnectionPool {
private String strictHostKeyChecking;
private Integer timeout;
/**
* ip地址
*/
private String ip = "";
/**
* 端口號(hào)
*/
private Integer port = 22;
/**
* 用戶名
*/
private String username = "";
/**
* 密碼
*/
private String password = "";
/**
* 每次擴(kuò)容增加幾個(gè)連接
*/
private int incrementalConnections = 2;
/**
* 最大連接數(shù)
*/
private int maxConnections = 10;
/**
* 最大空閑連接
*/
private int maxIdle = 4;
/**
* 最小空閑連接
*/
private int minIdel = 2;
private Vector<PooledConnection> connections = null;
@PostConstruct
private void init() {
createPool();
}
/**
* 構(gòu)造方法
*
* @param strictHostKeyChecking 連接模式
* @param timeout 超時(shí)時(shí)間
*/
public ConnectionPool(String strictHostKeyChecking, Integer timeout) {
this.strictHostKeyChecking = strictHostKeyChecking;
this.timeout = timeout;
}
/**
* 構(gòu)造方法
*
* @param strictHostKeyChecking 連接模式
* @param timeout 超時(shí)時(shí)間
* @param incrementalConnections 增量大小
*/
public ConnectionPool(String strictHostKeyChecking,
Integer timeout,
int incrementalConnections) {
this.strictHostKeyChecking = strictHostKeyChecking;
this.timeout = timeout;
this.incrementalConnections = incrementalConnections;
}
/**
* 構(gòu)造方法
*
* @param strictHostKeyChecking 連接模式
* @param timeout 超時(shí)時(shí)間
* @param incrementalConnections 增量大小
* @param maxConnections 連接池最大連接數(shù)
*/
public ConnectionPool(String strictHostKeyChecking,
Integer timeout,
int incrementalConnections,
int maxConnections) {
this.strictHostKeyChecking = strictHostKeyChecking;
this.timeout = timeout;
this.incrementalConnections = incrementalConnections;
this.maxConnections = maxConnections;
}
/**
* 創(chuàng)建連接池,判斷連接池是否創(chuàng)建,如果連接池沒(méi)有創(chuàng)建則創(chuàng)建連接池
*/
public synchronized void createPool() {
if (Objects.nonNull(connections)) {
return;
}
connections = new Vector<>();
log.info("create shell connectionPool success!");
}
/**
* 創(chuàng)建指定數(shù)量的連接放入連接池中
*
* @param numConnections 創(chuàng)建數(shù)量
* @throws JSchException 建立遠(yuǎn)程連接異常
*/
private void createConnections(int numConnections) throws JSchException {
for (int x = 0; x < numConnections; x++) {
// 判斷是否已達(dá)連接池最大連接,如果到達(dá)最大連接數(shù)據(jù)則不再創(chuàng)建連接
if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) {
break;
}
//在連接池中新增一個(gè)連接
try {
connections.addElement(new PooledConnection(newConnection(), ip));
} catch (JSchException e) {
log.error("create shell connection failed {}", e.getMessage());
throw new JSchException();
}
log.info("Session connected!");
}
}
/**
* 新一個(gè)連接session
*
* @return 創(chuàng)建的session
* @throws JSchException 遠(yuǎn)程連接異常
*/
private Session newConnection() throws JSchException {
// 創(chuàng)建一個(gè)session
JSch jsch = new JSch();
Session session = jsch.getSession(username, ip, port);
session.setPassword(password);
Properties sshConfig = new Properties();
sshConfig.put("StrictHostKeyChecking", strictHostKeyChecking);
session.setConfig(sshConfig);
session.connect(timeout);
session.setServerAliveInterval(1800);
return session;
}
/**
* 獲取一個(gè)可用session
*
* @param ip ip地址
* @param port 端口號(hào)
* @param username 用戶名
* @param password 密碼
* @return 可用的session
* @throws JSchException 遠(yuǎn)程連接異常
*/
public synchronized Session getConnection(String ip, Integer port, String username, String password) throws JSchException {
this.ip = ip;
this.port = port;
this.username = username;
this.password = password;
// 連接池還沒(méi)創(chuàng)建,則返回 null
if (Objects.isNull(connections)) {
return null;
}
// 獲得一個(gè)可用的數(shù)據(jù)庫(kù)連接
Session session = getFreeConnection();
// 假如目前沒(méi)有可以使用的連接,即所有的連接都在使用中,等一會(huì)重試
while (Objects.isNull(session)) {
wait(250);
session = getFreeConnection();
}
return session;
}
/**
* 獲取一個(gè)可用session
*
* @return 返回可用session
* @throws JSchException 遠(yuǎn)程連接異常
*/
private Session getFreeConnection() throws JSchException {
Session session = findFreeConnection();
// 如果沒(méi)有可用連接,則創(chuàng)建連接,
if (Objects.isNull(session)) {
createConnections(incrementalConnections);
session = findFreeConnection();
if (Objects.isNull(session)) {
return null;
}
}
return session;
}
/**
* 查找可用連接
*
* @return 返回可用連接
*/
private Session findFreeConnection() {
Session session = null;
PooledConnection conn;
Enumeration<PooledConnection> enumerate = connections.elements();
// 遍歷所有的對(duì)象,看是否有可用的連接
while (enumerate.hasMoreElements()) {
conn = enumerate.nextElement();
if (!ip.equals(conn.getTag())) {
continue;
}
if (!conn.isBusy()) {
session = conn.getSession();
conn.setBusy(true);
if (!testConnection(session)) {
try {
session = newConnection();
} catch (JSchException e) {
log.error("create shell connection failed {}", e.getMessage());
return null;
}
conn.setSession(session);
}
break;
}
}
return session;
}
/**
* 測(cè)試連接是否可用
*
* @param session 需要測(cè)試的session
* @return 是否可用
*/
private boolean testConnection(Session session) {
boolean connected = session.isConnected();
if (!connected) {
closeConnection(session);
return false;
}
return true;
}
/**
* 將session放回連接池中
*
* @param session 需要放回連接池中的session
*/
public synchronized void returnConnection(Session session) {
// 確保連接池存在,假如連接沒(méi)有創(chuàng)建(不存在),直接返回
if (Objects.isNull(connections)) {
log.error("ConnectionPool does not exist");
return;
}
PooledConnection conn;
Enumeration<PooledConnection> enumerate = connections.elements();
// 遍歷連接池中的所有連接,找到這個(gè)要返回的連接對(duì)象,將狀態(tài)設(shè)置為空閑
while (enumerate.hasMoreElements()) {
conn = enumerate.nextElement();
if (session.equals(conn.getSession())) {
conn.setBusy(false);
}
}
}
/**
* 刷新連接池
*
* @throws JSchException 遠(yuǎn)程連接異常
*/
public synchronized void refreshConnections() throws JSchException {
// 確保連接池己創(chuàng)新存在
if (Objects.isNull(connections)) {
log.error("ConnectionPool does not exist");
return;
}
PooledConnection conn;
Enumeration<PooledConnection> enumerate = connections.elements();
while (enumerate.hasMoreElements()) {
conn = enumerate.nextElement();
if (conn.isBusy()) {
wait(5000);
}
closeConnection(conn.getSession());
conn.setSession(newConnection());
conn.setBusy(false);
}
}
/**
* 關(guān)閉連接池
*/
public synchronized void closeConnectionPool() {
// 確保連接池存在,假如不存在,返回
if (Objects.isNull(connections)) {
log.info("Connection pool does not exist");
return;
}
PooledConnection conn;
Enumeration<PooledConnection> enumerate = connections.elements();
while (enumerate.hasMoreElements()) {
conn = enumerate.nextElement();
if (conn.isBusy()) {
wait(5000);
}
closeConnection(conn.getSession());
connections.removeElement(conn);
}
connections = null;
}
/**
* 關(guān)閉session會(huì)話
*
* @param session 需要關(guān)閉的session
*/
private void closeConnection(Session session) {
session.disconnect();
}
/**
* 線程暫停
*
* @param mSeconds 暫停時(shí)間
*/
private void wait(int mSeconds) {
try {
Thread.sleep(mSeconds);
} catch (InterruptedException e) {
log.error("{} 線程暫停失敗 -> {}", Thread.currentThread().getName(), e.getMessage());
}
}
/**
* 對(duì)象連接狀態(tài)
*/
class PooledConnection {
/**
* 遠(yuǎn)程連接
*/
Session session;
/**
* 此連接是否正在使用的標(biāo)志,默認(rèn)沒(méi)有正在使用
*/
boolean busy = false;
/**
* 連接標(biāo)記
*/
String tag;
/**
* 構(gòu)造函數(shù),根據(jù)一個(gè) Session 構(gòu)造一個(gè) PooledConnection 對(duì)象
*
* @param session 連接
* @param tag 連接標(biāo)記
*/
public PooledConnection(Session session, String tag) {
this.session = session;
this.tag = tag;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
public boolean isBusy() {
return busy;
}
public void setBusy(boolean busy) {
this.busy = busy;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
}
public int getIncrementalConnections() {
return this.incrementalConnections;
}
public void setIncrementalConnections(int incrementalConnections) {
this.incrementalConnections = incrementalConnections;
}
public int getMaxConnections() {
return this.maxConnections;
}
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
}
4. 改造shellUtil
ShellUtil.java
@Slf4j
@Component
@Scope(value = "prototype")
public class ShellUtil {
/**
* ip地址
*/
private String ip = "";
/**
* 端口號(hào)
*/
private Integer port = 22;
/**
* 用戶名
*/
private String username = "";
/**
* 密碼
*/
private String password = "";
private Session session;
private Channel channel;
private ChannelExec channelExec;
private ChannelSftp channelSftp;
private ChannelShell channelShell;
private ConnectionPool connectionPool;
public ShellUtil(ConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}
/**
* 初始化
*
* @param ip 遠(yuǎn)程主機(jī)IP地址
* @param port 遠(yuǎn)程主機(jī)端口
* @param username 遠(yuǎn)程主機(jī)登陸用戶名
* @param password 遠(yuǎn)程主機(jī)登陸密碼
* @throws JSchException JSch異常
*/
public void init(String ip, Integer port, String username, String password) throws JSchException {
this.ip = ip;
this.port = port;
this.username = username;
this.password = password;
}
public void init(String ip, String username, String password) throws JSchException {
this.ip = ip;
this.username = username;
this.password = password;
}
private void getSession() throws JSchException {
session = connectionPool.getConnection(ip, port, username, password);
if (Objects.isNull(session)) {
connectionPool.refreshConnections();
session = connectionPool.getConnection(ip, port, username, password);
if (Objects.isNull(session)){
throw new RuntimeException("無(wú)可用連接");
}
}
}
/**
* 連接多次執(zhí)行命令,執(zhí)行命令完畢后需要執(zhí)行close()方法
*
* @param command 需要執(zhí)行的指令
* @return 執(zhí)行結(jié)果
* @throws Exception 沒(méi)有執(zhí)行初始化
*/
public String execCmd(String command) throws Exception {
initChannelExec();
log.info("execCmd command - > {}", command);
channelExec.setCommand(command);
channel.setInputStream(null);
channelExec.setErrStream(System.err);
channel.connect();
StringBuilder sb = new StringBuilder(16);
try (InputStream in = channelExec.getInputStream();
InputStreamReader isr = new InputStreamReader(in, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(isr)) {
String buffer;
while ((buffer = reader.readLine()) != null) {
sb.append("\n").append(buffer);
}
log.info("execCmd result - > {}", sb);
return sb.toString();
}
}
/**
* 執(zhí)行命令關(guān)閉連接
*
* @param command 需要執(zhí)行的指令
* @return 執(zhí)行結(jié)果
* @throws Exception 沒(méi)有執(zhí)行初始化
*/
public String execCmdAndClose(String command) throws Exception {
String result = execCmd(command);
close();
return result;
}
/**
* 執(zhí)行復(fù)雜shell命令
*
* @param cmds 多條命令
* @return 執(zhí)行結(jié)果
* @throws Exception 連接異常
*/
public String execCmdByShell(String... cmds) throws Exception {
return execCmdByShell(Arrays.asList(cmds));
}
/**
* 執(zhí)行復(fù)雜shell命令
*
* @param cmds 多條命令
* @return 執(zhí)行結(jié)果
* @throws Exception 連接異常
*/
public String execCmdByShell(List<String> cmds) throws Exception {
String result = "";
initChannelShell();
InputStream inputStream = channelShell.getInputStream();
channelShell.setPty(true);
channelShell.connect();
OutputStream outputStream = channelShell.getOutputStream();
PrintWriter printWriter = new PrintWriter(outputStream);
for (String cmd : cmds) {
printWriter.println(cmd);
}
printWriter.flush();
byte[] tmp = new byte[1024];
while (true) {
while (inputStream.available() > 0) {
int i = inputStream.read(tmp, 0, 1024);
if (i < 0) {
break;
}
String s = new String(tmp, 0, i);
if (s.contains("--More--")) {
outputStream.write((" ").getBytes());
outputStream.flush();
}
System.out.println(s);
}
if (channelShell.isClosed()) {
System.out.println("exit-status:" + channelShell.getExitStatus());
break;
}
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
outputStream.close();
inputStream.close();
close();
return result;
}
/**
* SFTP文件上傳
*
* @param src 源地址
* @param dst 目的地址
* @throws Exception 上傳文件失敗
*/
public void put(String src, String dst) throws Exception {
put(src, dst, ChannelSftp.OVERWRITE);
}
/**
* SFTP文件上傳
*
* @param src 源地址
* @param dst 目的地址
* @param mode 上傳模式 默認(rèn)為ChannelSftp.OVERWRITE
* @throws Exception 上傳文件失敗
*/
public void put(String src, String dst, int mode) throws Exception {
initChannelSftp();
log.info("Upload File {} -> {}", src, dst);
channelSftp.put(src, dst, mode);
log.info("Upload File Success!");
close();
}
/**
* SFTP文件上傳并監(jiān)控上傳進(jìn)度
*
* @param src 源地址
* @param dst 目的地址
* @throws Exception 上傳文件失敗
*/
public void putMonitorAndClose(String src, String dst) throws Exception {
putMonitorAndClose(src, dst, ChannelSftp.OVERWRITE);
}
/**
* SFTP文件上傳并監(jiān)控上傳進(jìn)度
*
* @param src 源地址
* @param dst 目的地址
* @param mode 上傳模式 默認(rèn)為ChannelSftp.OVERWRITE
* @throws Exception 上傳文件失敗
*/
public void putMonitorAndClose(String src, String dst, int mode) throws Exception {
initChannelSftp();
FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length());
log.info("Upload File {} -> {}", src, dst);
channelSftp.put(src, dst, monitor, mode);
log.info("Upload File Success!");
close();
}
/**
* SFTP文件下載
*
* @param src 源文件地址
* @param dst 目的地址
* @throws Exception 下載文件失敗
*/
public void get(String src, String dst) throws Exception {
initChannelSftp();
log.info("Download File {} -> {}", src, dst);
channelSftp.get(src, dst);
log.info("Download File Success!");
close();
}
/**
* SFTP文件下載并監(jiān)控下載進(jìn)度
*
* @param src 源文件地址
* @param dst 目的地址
* @throws Exception 下載文件失敗
*/
public void getMonitorAndClose(String src, String dst) throws Exception {
initChannelSftp();
FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length());
log.info("Download File {} -> {}", src, dst);
channelSftp.get(src, dst, monitor);
log.info("Download File Success!");
close();
}
/**
* 刪除指定目錄文件
*
* @param path 刪除路徑
* @throws Exception 遠(yuǎn)程主機(jī)連接異常
*/
public void deleteFile(String path) throws Exception {
initChannelSftp();
channelSftp.rm(path);
log.info("Delete File {}", path);
close();
}
/**
* 刪除指定目錄
*
* @param path 刪除路徑
* @throws Exception 遠(yuǎn)程主機(jī)連接異常
*/
public void deleteDir(String path) throws Exception {
initChannelSftp();
channelSftp.rmdir(path);
log.info("Delete Dir {} ", path);
close();
}
/**
* 釋放資源
*/
public void close() {
connectionPool.returnConnection(session);
}
private void initChannelSftp() throws Exception {
getSession();
channel = session.openChannel("sftp");
channel.connect(); // 建立SFTP通道的連接
channelSftp = (ChannelSftp) channel;
if (session == null || channel == null || channelSftp == null) {
log.error("請(qǐng)先執(zhí)行init()");
throw new Exception("請(qǐng)先執(zhí)行init()");
}
}
private void initChannelExec() throws Exception {
getSession();
// 打開(kāi)執(zhí)行shell指令的通道
channel = session.openChannel("exec");
channelExec = (ChannelExec) channel;
if (session == null || channel == null || channelExec == null) {
log.error("請(qǐng)先執(zhí)行init()");
throw new Exception("請(qǐng)先執(zhí)行init()");
}
}
private void initChannelShell() throws Exception {
getSession();
// 打開(kāi)執(zhí)行shell指令的通道
channel = session.openChannel("shell");
channelShell = (ChannelShell) channel;
if (session == null || channel == null || channelShell == null) {
log.error("請(qǐng)先執(zhí)行init()");
throw new Exception("請(qǐng)先執(zhí)行init()");
}
}
}
5. 添加配置
ConnectionPoolConfig.java
@Configuration
public class PoolConfiguration {
@Value("${ssh.strictHostKeyChecking:no}")
private String strictHostKeyChecking;
@Value("${ssh.timeout:30000}")
private Integer timeout;
@Value("${ssh.incrementalConnections:2}")
private Integer incrementalConnections;
@Value("${ssh.maxConnections:10}")
private Integer maxConnections;
@Bean
public ConnectionPool connectionPool(){
return new ConnectionPool(strictHostKeyChecking, timeout,incrementalConnections,maxConnections);
}
}
6. 線程安全問(wèn)題解決
6.1
public class SessionThreadLocal {
private static ThreadLocal<Session> threadLocal = new ThreadLocal<>();
public static synchronized void set(Session session) {
threadLocal.set(session);
}
public static synchronized Session get( ) {
return threadLocal.get();
}
public static synchronized void remove( ) {
threadLocal.remove();
}
}
6.2 使用springboot中bean的作用域prototype
使用@Lookup注入方式
@Lookup
public ShellUtil getshellUtil(){
return null;
};
@GetMapping("/test")
public void Test() throws Exception {
int i = getshellUtil().hashCode();
System.out.println(i);
}
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java Stream 流實(shí)現(xiàn)合并操作示例
這篇文章主要介紹了Java Stream 流實(shí)現(xiàn)合并操作,結(jié)合實(shí)例形式詳細(xì)分析了Java Stream 流實(shí)現(xiàn)合并操作原理與相關(guān)注意事項(xiàng),需要的朋友可以參考下2020-05-05
SpringBoot服務(wù)設(shè)置禁止server.point端口的使用
本文主要介紹了SpringBoot服務(wù)設(shè)置禁止server.point端口的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-01-01
java 注解實(shí)現(xiàn)一個(gè)可配置線程池的方法示例
這篇文章主要介紹了java 注解實(shí)現(xiàn)一個(gè)可配置線程池的方法示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-01-01
詳解Spring DeferredResult異步操作使用場(chǎng)景
本文主要介紹了Spring DeferredResult異步操作使用場(chǎng)景,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10
SpringBoot中l(wèi)ogback日志保存到mongoDB的方法
這篇文章主要介紹了SpringBoot中l(wèi)ogback日志保存到mongoDB的方法,2017-11-11
Spring框架web項(xiàng)目實(shí)戰(zhàn)全代碼分享
這篇文章主要介紹了Spring框架web項(xiàng)目實(shí)戰(zhàn)全代碼分享,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11
mybatis一對(duì)多方式實(shí)現(xiàn)批量插入
這篇文章主要介紹了mybatis一對(duì)多方式實(shí)現(xiàn)批量插入,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11

