Java動態(tài)線程池插件dynamic-tp集成zookeeper
前言
dynamic-tp
是一個輕量級的動態(tài)線程池插件,它是一個基于配置中心的動態(tài)線程池,線程池的參數(shù)可以通過配置中心配置進行動態(tài)的修改,在配置中心的支持上最開始的時候支持Nacos
和Apollo
,由于筆者公司用的配置中心是Zookeeper
,所以就想著擴展支持Zookeeper
,在了解源碼支持發(fā)現(xiàn)dynamic-tp
的擴展能力做的很好,提供了擴展接口,只要我開發(fā)對應(yīng)的配置中心模塊即可,最終筆者實現(xiàn)了Zookeeper
的支持并貢獻到社區(qū)。接下來我通過源碼解析方式介紹下Zookeeper
配置中心的接入。
配置刷新
dynamic-tp
提供了一個刷新配置的接口Refresher
,抽象類AbstractRefresher
實現(xiàn)刷新配置接口的刷新配置方法refresh
,該方法能根據(jù)配置類型內(nèi)容和配置解析配置并刷新動態(tài)線程池的相關(guān)配置,由DtpRegistry
負責(zé)刷新線程池配置,事件發(fā)布訂閱模式操作Web容器參數(shù),代碼如下:
public interface Refresher { /** * Refresh with specify content. * @param content content * @param fileType file type */ void refresh(String content, ConfigFileTypeEnum fileType); }
@Slf4j public abstract class AbstractRefresher implements Refresher { @Resource private DtpProperties dtpProperties; @Resource private ApplicationEventMulticaster applicationEventMulticaster; @Override public void refresh(String content, ConfigFileTypeEnum fileTypeEnum) { if (StringUtils.isBlank(content) || Objects.isNull(fileTypeEnum)) { return; } try { // 根據(jù)配置內(nèi)容和配置類型將配置內(nèi)容轉(zhuǎn)成Map val prop = ConfigHandler.getInstance().parseConfig(content, fileTypeEnum); doRefresh(prop); } catch (IOException e) { log.error("DynamicTp refresh error, content: {}, fileType: {}", content, fileTypeEnum, e); } } private void doRefresh(Map<Object, Object> properties) { // 將Map中的配置轉(zhuǎn)換成DtpProperties ConfigurationPropertySource sources = new MapConfigurationPropertySource(properties); Binder binder = new Binder(sources); ResolvableType type = ResolvableType.forClass(DtpProperties.class); Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties); binder.bind(MAIN_PROPERTIES_PREFIX, target); // 刷新動態(tài)線程池配置 DtpRegistry.refresh(dtpProperties); // 發(fā)布刷新實現(xiàn),該事件用于控制Web容器線程池參數(shù)控制 publishEvent(); } private void publishEvent() { RefreshEvent event = new RefreshEvent(this, dtpProperties); applicationEventMulticaster.multicastEvent(event); } }
Zookeeper配置中心接入擴展實現(xiàn)
基于AbstractRefresher
就可以實現(xiàn)Zookeeper
配置中心的擴展了,Zookeeper
的擴展實現(xiàn)繼承AbstractRefresher
,Zookeeper
的擴展實現(xiàn)只需要監(jiān)聽配置中心的配置變更即可拿到配置內(nèi)容,然后通過refresh
刷新配置即可。代碼如下:
ZookeeperRefresher
繼承AbstractRefresher
,實現(xiàn)InitializingBean
,afterPropertiesSet
方法邏輯從配置DtpProperties
獲取Zookeeper
的配置信息,CuratorFrameworkFactory
創(chuàng)建客戶端,設(shè)置監(jiān)聽器,這里有兩種監(jiān)聽器,一個是連接監(jiān)聽ConnectionStateListener
,一個是節(jié)點變動監(jiān)聽CuratorListener
,出發(fā)監(jiān)聽后loadNode
負責(zé)從Zookeeper
獲取配置文件配置并組裝配置內(nèi)容,然后通過refresh
刷新配置,注意,Zookeeper
配置目前配置類型僅支持properties
。
@Slf4j public class ZookeeperRefresher extends AbstractRefresher implements InitializingBean { @Resource private DtpProperties dtpProperties; private CuratorFramework curatorFramework; @Override public void afterPropertiesSet() throws Exception { DtpProperties.Zookeeper zookeeper = dtpProperties.getZookeeper(); curatorFramework = CuratorFrameworkFactory.newClient(zookeeper.getZkConnectStr(), new ExponentialBackoffRetry(1000, 3)); String nodePath = ZKPaths.makePath(ZKPaths.makePath(zookeeper.getRootNode(), zookeeper.getConfigVersion()), zookeeper.getNode()); final ConnectionStateListener connectionStateListener = (client, newState) -> { if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) { loadNode(nodePath); }}; final CuratorListener curatorListener = (client, curatorEvent) -> { final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent(); if (null != watchedEvent) { switch (watchedEvent.getType()) { case NodeChildrenChanged: case NodeDataChanged: loadNode(nodePath); break; default: break; } }}; curatorFramework.getConnectionStateListenable().addListener(connectionStateListener); curatorFramework.getCuratorListenable().addListener(curatorListener); curatorFramework.start(); log.info("DynamicTp refresher, add listener success, nodePath: {}", nodePath); } /** * load config and refresh * @param nodePath config path */ public void loadNode(String nodePath) { try { final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren(); final List<String> children = childrenBuilder.watched().forPath(nodePath); StringBuilder content = new StringBuilder(); children.forEach(c -> { String n = ZKPaths.makePath(nodePath, c); final String nodeName = ZKPaths.getNodeFromPath(n); final GetDataBuilder data = curatorFramework.getData(); String value = ""; try { value = new String(data.watched().forPath(n), StandardCharsets.UTF_8); } catch (Exception e) { log.error("zk config value watched exception.", e); } content.append(nodeName).append("=").append(value).append("\n"); }); refresh(content.toString(), ConfigFileTypeEnum.PROPERTIES); } catch (Exception e) { log.error("load zk node error, nodePath is {}", nodePath, e); } } }
總結(jié)
dynamic-tp
對應(yīng)支持配置中心的擴展能力做的非常好,筆者通過Zookeeper
客戶端CuratorFramework
設(shè)置監(jiān)聽的方式進行接入,主要監(jiān)聽CuratorFramework
客戶端連接建立和斷開的事件和節(jié)點變動的事件實現(xiàn)了動態(tài)線程池參數(shù)的更新。
到此這篇關(guān)于Java動態(tài)線程池插件dynamic-tp集成zookeeper的文章就介紹到這了,更多相關(guān)Java dynamic-tp內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java中的FileReader和FileWriter讀寫流
這篇文章主要介紹了java中的FileReader和FileWriter讀寫流,在java中對數(shù)據(jù)輸入輸出的操作陳作為流我們對不同的文件進行操作,或者對操作文件進行輸入和輸出時所用的流都是不同的,因此在java.io的包下存在很多流的類或者接口提供給我們對應(yīng)的操作,需要的朋友可以參考下2023-10-10調(diào)用Process.waitfor導(dǎo)致的進程掛起問題及解決
這篇文章主要介紹了調(diào)用Process.waitfor導(dǎo)致的進程掛起問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12MyBatis如何使用PageHelper實現(xiàn)分頁查詢
這篇文章主要介紹了MyBatis如何使用PageHelper實現(xiàn)分頁查詢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11