Java 如何快速實(shí)現(xiàn)一個連接池
什么是 ACP?
ACP 庫提供了一整套用于實(shí)現(xiàn)對象池化的 API,以及若干種各具特色的對象池實(shí)現(xiàn)。目前最常用的版本是 2.0 版本,相對于 1.x 版本而言,并不是簡單升級。2.0 版本是對象池實(shí)現(xiàn)的完全重寫,顯著的提升了性能和可伸縮性,并且包含可靠的實(shí)例跟蹤和池監(jiān)控。
Apache Commons Pool 的官網(wǎng)地址為:Pool – Overview,想翻找相關(guān)文檔資料,到這里去是最權(quán)威、最全面的。
如何使用 ACP?
要使用 ACP 實(shí)現(xiàn)一個線程池,首先需要先引入 ACP 的依賴包,這里以 Maven 為例。
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.0</version> </dependency>
要使用 ACP 實(shí)現(xiàn)一個對象池,大致可以分為三個步驟:
- 創(chuàng)建對象工廠:告訴 ACP 如何創(chuàng)建你要的對象。
- 創(chuàng)建對象池:告訴 ACP 你想創(chuàng)建一個怎樣的對象池。
- 使用對象池:ACP 告訴你如何使用你的對象。
創(chuàng)建對象工廠
對象工廠告訴 ACP,它應(yīng)該如何去創(chuàng)建、激活、鈍化、銷毀你的對象。創(chuàng)建對象工廠非常簡單,只需要實(shí)現(xiàn) ACP 的 PooledObjectFactory 接口即可。PooledObjectFactory 接口的定義如下:
public interface PooledObjectFactory<T> {
PooledObject<T> makeObject() throws Exception;
void destroyObject(PooledObject<T> p) throws Exception;
boolean validateObject(PooledObject<T> p);
void activateObject(PooledObject<T> p) throws Exception;
void passivateObject(PooledObject<T> p) throws Exception;
}
但更多情況下,我們會繼承 BasePooledObjectFactory 類來實(shí)現(xiàn)對象工廠。因?yàn)?BasePooledObjectFactory 類是 PooledObjectFactory 的基礎(chǔ)實(shí)現(xiàn)類,使用它可以幫我們省了很多麻煩。通過繼承這個抽象類,我們只需要實(shí)現(xiàn)兩個方法:create() 和 wrap() 方法。
// 告訴 ACP 如何創(chuàng)建對象 public abstract T create() throws Exception; // 定義你要返回的對象 public abstract PooledObject<T> wrap(T obj);
create() 方法定義你的對象初始化過程,最后將初始化完成的對象返回。例如你想定義一個 SFTP 的連接,那么你首先需要定義一個 JSch 對象,之后設(shè)置賬號密碼,之后連接服務(wù)器,最后返回一個 ChannelSftp 對象。
public ChannelSftp create() {
// SFTP 連接的創(chuàng)建過程
}
wrap() 方法定義你要返回的對象,對于一個 SFTP 的連接池來說,其實(shí)就是一個 ChannelSftp 對象。一般情況下可以使用類 DefaultPooledObject 替代,參考實(shí)現(xiàn)如下:
@Override
public PooledObject<Foo> wrap(Foo foo) {
return new DefaultPooledObject<Foo>(foo);
}
創(chuàng)建對象池
創(chuàng)建好對象工廠之后,ACP 已經(jīng)知道你需要的對象如何創(chuàng)建了。那么接下來,你需要根據(jù)你的實(shí)際需要,去創(chuàng)建一個對象池。在 ACP 中,我們通過 GenericObjectPool 以及 GenericObjectPoolConfig 來創(chuàng)建一個對象池。
// 聲明一個對象池
private GenericObjectPool<ChannelSftp> sftpConnectPool;
// 設(shè)置連接池配置
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setEvictionPolicyClassName("tech.shuyi.javacodechip.acp.SftpEvictionPolicy");
poolConfig.setBlockWhenExhausted(true);
poolConfig.setJmxEnabled(false);
poolConfig.setMaxWaitMillis(1000 * 10);
poolConfig.setTimeBetweenEvictionRunsMillis(60 * 1000);
poolConfig.setMinEvictableIdleTimeMillis(20 * 1000);
poolConfig.setTestWhileIdle(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestOnBorrow(true);
poolConfig.setMaxTotal(3);
// 設(shè)置拋棄策略
AbandonedConfig abandonedConfig = new AbandonedConfig();
abandonedConfig.setRemoveAbandonedOnMaintenance(true);
abandonedConfig.setRemoveAbandonedOnBorrow(true);
this.sftpConnectPool = new GenericObjectPool<>(sftpConnectFactory, poolConfig, abandonedConfig);
在上面創(chuàng)建 SFTP 連接池的代碼中,我們配置了一些線程池的參數(shù)以及設(shè)置了拋棄策略。拋棄策略是非常重要的,如果沒有設(shè)置拋棄策略,那么會拿到失效的連接從而導(dǎo)致獲取文件失敗。拋棄策略是通過 poolConfig.setEvictionPolicyClassName 來設(shè)置的,我們這里設(shè)置的是 SftpEvictionPolicy 類,其代碼內(nèi)容如下:
@Slf4j
@Component
public class SftpEvictionPolicy implements EvictionPolicy<com.jcraft.jsch.ChannelSftp> {
@Override
public boolean evict(EvictionConfig config, PooledObject<com.jcraft.jsch.ChannelSftp> underTest, int idleCount) {
try {
// 連接失效時進(jìn)行驅(qū)逐
if (!underTest.getObject().isConnected()) {
log.warn("connect time out, evict the connection. time={}",System.currentTimeMillis() - underTest.getLastReturnTime());
return true;
}
}catch (Exception e){
return true;
}
return false;
}
}
看到這里,創(chuàng)建線程池的代碼就結(jié)束了,SftpConnectPool 文件的全部內(nèi)容如下:
@Slf4j
public class SftpConnectPool {
private GenericObjectPool<ChannelSftp> sftpConnectPool;
public SftpConnectPool(SftpConnectFactory sftpConnectFactory) {
// 設(shè)置連接池配置
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setEvictionPolicyClassName("tech.shuyi.javacodechip.acp.SftpEvictionPolicy");
poolConfig.setBlockWhenExhausted(true);
poolConfig.setJmxEnabled(false);
poolConfig.setMaxWaitMillis(1000 * 10);
poolConfig.setTimeBetweenEvictionRunsMillis(60 * 1000);
poolConfig.setMinEvictableIdleTimeMillis(20 * 1000);
poolConfig.setTestWhileIdle(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestOnBorrow(true);
poolConfig.setMaxTotal(3);
// 設(shè)置拋棄策略
AbandonedConfig abandonedConfig = new AbandonedConfig();
abandonedConfig.setRemoveAbandonedOnMaintenance(true);
abandonedConfig.setRemoveAbandonedOnBorrow(true);
this.sftpConnectPool = new GenericObjectPool<>(sftpConnectFactory, poolConfig, abandonedConfig);
}
public ChannelSftp borrowObject() {
try {
return sftpConnectPool.borrowObject();
} catch (Exception e) {
log.error("borrowObject error", e);
return null;
}
}
public void returnObject(ChannelSftp channelSftp) {
if (channelSftp!=null) {
sftpConnectPool.returnObject(channelSftp);
}
}
}
為了方便使用,我還增加了 borrowObject 和 returnObject 方法,但這兩個并不是必須的。在這兩個方法中,我們分別調(diào)用了 GenericObjectPool 類的 borrowObject 方法和 returnObject 方法。這正是 ACP 提供的、使用線程池對象的方法,先借一個對象,之后歸還對象。
注:其實(shí)在這一步,已經(jīng)包含了對象池的使用了。但實(shí)際使用的時候,我們經(jīng)常是將對象池的聲明與使用放在同一個類中,因此為了講解方便,這里沒有分開。因此下文的使用對象池,本質(zhì)上是對對象池做進(jìn)一步封裝。
使用對象池
到這里我們的 SFTP 對象池就已經(jīng)創(chuàng)建完畢了,是不是非常簡單呢!但在實(shí)際的工作中,我們通常會在這基礎(chǔ)上,做一些封裝。對于我們這次的 SFTP 連接池來說,我們會對外直接提供下載文件的服務(wù),將 SFTP 對象池進(jìn)一步封裝起來,不需要關(guān)心怎么獲取文件。
public class SftpFileHelper {
@Autowired
private SftpConnectPool sftpConnectPool;
public void download(String dir, String file, String saveUrl)throws IOException {
ChannelSftp sftp = sftpConnectPool.borrowObject();
log.info("begin to download file, dir={}, file={}, saveUrl={}", dir, file, saveUrl);
try {
if (!StringUtils.isEmpty(dir)) {
sftp.cd(dir);
}
File downloadFile = new File(saveUrl);
sftp.get(file, new FileOutputStream(downloadFile));
}catch (Exception e){
log.warn("下載文件失敗", e);
}finally {
sftpConnectPool.returnObject(sftp);
}
log.info("file:{} is download successful", file);
}
}
最后我們寫一個測試用例來試一試,是否能正常下載文件。
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SftpFileHelperTest {
@Autowired
private SftpFileHelper sftpFileHelper;
@Test
public void testDownloadFtpFile() throws Exception {
sftpFileHelper.download("dir", "fileName", "fileName");
}
}
總結(jié)
本文針對 Apache Commons Pool 庫最常用的對象池功能做了演示。看完這篇文章,我們知道創(chuàng)建一個線程池需要三個步驟,分別是:
- 創(chuàng)建對象工廠:告訴 ACP 如何創(chuàng)建你要的對象。
- 創(chuàng)建對象池:告訴 ACP 你想創(chuàng)建一個怎樣的對象池、設(shè)置驅(qū)逐策略。
- 使用對象池:ACP 告訴你如何使用你的對象。
本文相關(guān)代碼存放在博主 Github 項(xiàng)目:java-code-chip 中,可以點(diǎn)擊地址獲?。?a target="_blank" rel="external nofollow" >java-code-chip/src/main/java/tech/shuyi/javacodechip/acp at master · chenyurong/java-code-chip
ACP 庫能夠讓讀者朋友們快速地創(chuàng)建一個對象池,更加專注于業(yè)務(wù)內(nèi)容。但事實(shí)上,ACP 提供的內(nèi)容遠(yuǎn)不止如此,它還有更多更高級的功能。
例如當(dāng)我們連接的 SFTP 服務(wù)器有多個時,我們需要通過不同地址來獲得不同的連接對象。此時最笨的辦法是每個不同的地址,都復(fù)制多一份代碼,然后通過不同類的不同方法來實(shí)現(xiàn)。但這樣的情況工作量相當(dāng)可觀,并且也會有很多重復(fù)代碼。這種時候就可以使用BaseKeyedPooledObjectFactory 來替代 BasePooledObjectFactory,從而實(shí)現(xiàn)通過 key 來實(shí)現(xiàn)不同地址的連接對象管理。
更多關(guān)于 ACP 的內(nèi)容,感興趣的同學(xué)可以自行探索,這里就不深入講解了。
另一種實(shí)現(xiàn)方式:
泛型接口ConnectionPool.java
public interface ConnectionPool<T> {
/**
* 初始化池資源
* @param maxActive 池中最大活動連接數(shù)
* @param maxWait 最大等待時間
*/
void init(Integer maxActive, Long maxWait);
/**
* 從池中獲取資源
* @return 連接資源
*/
T getResource() throws Exception;
/**
* 釋放連接
* @param connection 正在使用的連接
*/
void release(T connection) throws Exception;
/**
* 釋放連接池資源
*/
void close();
}
以zookeeper為例,實(shí)現(xiàn)zookeeper連接池,ZookeeperConnectionPool.java
public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> {
//最大活動連接數(shù)
private Integer maxActive;
//最大等待時間
private Long maxWait;
//空閑隊(duì)列
private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>();
//繁忙隊(duì)列
private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>();
//連接池活動連接數(shù)
private AtomicInteger activeSize = new AtomicInteger(0);
//連接池關(guān)閉標(biāo)記
private AtomicBoolean isClosed = new AtomicBoolean(false);
//總共獲取的連接記數(shù)
private AtomicInteger createCount = new AtomicInteger(0);
//等待zookeeper客戶端創(chuàng)建完成的計(jì)數(shù)器
private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1));
public ZookeeperConnectionPool(Integer maxActive, Long maxWait) {
this.init(maxActive, maxWait);
}
@Override
public void init(Integer maxActive, Long maxWait) {
this.maxActive = maxActive;
this.maxWait = maxWait;
}
@Override
public ZooKeeper getResource() throws Exception {
ZooKeeper zooKeeper;
Long nowTime = System.currentTimeMillis();
final CountDownLatch countDownLatch = latchThreadLocal.get();
//空閑隊(duì)列idle是否有連接
if ((zooKeeper = idle.poll()) == null) {
//判斷池中連接數(shù)是否小于maxActive
if (activeSize.get() < maxActive) {
//先增加池中連接數(shù)后判斷是否小于等于maxActive
if (activeSize.incrementAndGet() <= maxActive) {
//創(chuàng)建zookeeper連接
zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> {
if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
});
countDownLatch.await();
System.out.println("Thread:" + Thread.currentThread().getId() + "獲取連接:" + createCount.incrementAndGet() + "條");
busy.offer(zooKeeper);
return zooKeeper;
} else {
//如增加后發(fā)現(xiàn)大于maxActive則減去增加的
activeSize.decrementAndGet();
}
}
//若活動線程已滿則等待busy隊(duì)列釋放連接
try {
System.out.println("Thread:" + Thread.currentThread().getId() + "等待獲取空閑資源");
Long waitTime = maxWait - (System.currentTimeMillis() - nowTime);
zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new Exception("等待異常");
}
//判斷是否超時
if (zooKeeper != null) {
System.out.println("Thread:" + Thread.currentThread().getId() + "獲取連接:" + createCount.incrementAndGet() + "條");
busy.offer(zooKeeper);
return zooKeeper;
} else {
System.out.println("Thread:" + Thread.currentThread().getId() + "獲取連接超時,請重試!");
throw new Exception("Thread:" + Thread.currentThread().getId() + "獲取連接超時,請重試!");
}
}
//空閑隊(duì)列有連接,直接返回
busy.offer(zooKeeper);
return zooKeeper;
}
@Override
public void release(ZooKeeper connection) throws Exception {
if (connection == null) {
System.out.println("connection 為空");
return;
}
if (busy.remove(connection)){
idle.offer(connection);
} else {
activeSize.decrementAndGet();
throw new Exception("釋放失敗");
}
}
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
idle.forEach((zooKeeper) -> {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
busy.forEach((zooKeeper) -> {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
測試用例
這里創(chuàng)建20個線程并發(fā)測試連接池,Test.java
public class Test {
public static void main(String[] args) throws Exception {
int threadCount = 20;
Integer maxActive = 10;
Long maxWait = 10000L;
ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait);
CountDownLatch countDownLatch = new CountDownLatch(20);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
countDownLatch.countDown();
try {
countDownLatch.await();
ZooKeeper zooKeeper = pool.getResource();
Thread.sleep(2000);
pool.release(zooKeeper);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
while (true){
}
}
}
以上就是Java 如何快速實(shí)現(xiàn)一個連接池的詳細(xì)內(nèi)容,更多關(guān)于Java 連接池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot?ConfigurationProperties的綁定源碼示例解析
這篇文章主要為大家介紹了springboot?ConfigurationProperties的綁定源碼示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09
java中Spring Security的實(shí)例詳解
這篇文章主要介紹了java中Spring Security的實(shí)例詳解的相關(guān)資料,spring security是一個多方面的安全認(rèn)證框架,提供了基于JavaEE規(guī)范的完整的安全認(rèn)證解決方案,需要的朋友可以參考下2017-09-09
Springboot日志配置的實(shí)現(xiàn)示例
本文主要介紹了Springboot日志配置的實(shí)現(xiàn)示例,使用slf4j和logback的方式記錄日志,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-08-08
詳解SpringBoot中@NotNull,@NotBlank注解使用
這篇文章主要為大家詳細(xì)介紹了Spring?Boot中集成Validation與@NotNull,@NotBlank等注解的簡單使用,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-08-08
Hadoop源碼分析四遠(yuǎn)程debug調(diào)試
本篇是Hadoop源碼分析系列文章第四篇,主要介紹一下Hadoop的遠(yuǎn)程debug調(diào)試步驟,后續(xù)本系列文章會持續(xù)更新,有需要的朋友可以借鑒參考下2021-09-09
Java?DelayQueue實(shí)現(xiàn)任務(wù)延時示例講解
DelayQueue是一個無界的BlockingQueue的實(shí)現(xiàn)類,用于放置實(shí)現(xiàn)了Delayed接口的對象,其中的對象只能在其到期時才能從隊(duì)列中取走。本文就來利用DelayQueue實(shí)現(xiàn)延時任務(wù),感興趣的可以了解一下2022-09-09
Spring Boot結(jié)合IDEA自帶Maven插件如何快速切換profile
IDEA是目前 Java 開發(fā)者中使用最多的開發(fā)工具,它有著簡約的設(shè)計(jì)風(fēng)格,強(qiáng)大的集成工具,便利的快捷鍵,這篇文章主要介紹了Spring Boot結(jié)合IDEA自帶Maven插件快速切換profile,需要的朋友可以參考下2023-03-03

