欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java動態(tài)線程池插件dynamic-tp集成zookeeper

 更新時間:2023年03月02日 09:48:27   作者:Redick01  
ZooKeeper是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個開源的實現(xiàn),是Hadoop和Hbase的重要組件。它是一個為分布式應(yīng)用提供一致性的軟件,提供的功能包括:配置維護、域名服務(wù)、分布式同步、組服務(wù)等

前言

dynamic-tp是一個輕量級的動態(tài)線程池插件,它是一個基于配置中心的動態(tài)線程池,線程池的參數(shù)可以通過配置中心配置進行動態(tài)的修改,在配置中心的支持上最開始的時候支持NacosApollo,由于筆者公司用的配置中心是Zookeeper,所以就想著擴展支持Zookeeper,在了解源碼支持發(fā)現(xiàn)dynamic-tp的擴展能力做的很好,提供了擴展接口,只要我開發(fā)對應(yīng)的配置中心模塊即可,最終筆者實現(xiàn)了Zookeeper的支持并貢獻到社區(qū)。接下來我通過源碼解析方式介紹下Zookeeper配置中心的接入。

配置刷新

dynamic-tp提供了一個刷新配置的接口Refresher,抽象類AbstractRefresher實現(xiàn)刷新配置接口的刷新配置方法refresh,該方法能根據(jù)配置類型內(nèi)容和配置解析配置并刷新動態(tài)線程池的相關(guān)配置,由DtpRegistry負責(zé)刷新線程池配置,事件發(fā)布訂閱模式操作Web容器參數(shù),代碼如下:

public interface Refresher {
    /**
     * Refresh with specify content.
     * @param content content
     * @param fileType file type
     */
    void refresh(String content, ConfigFileTypeEnum fileType);
}
@Slf4j
public abstract class AbstractRefresher implements Refresher {
    @Resource
    private DtpProperties dtpProperties;
    @Resource
    private ApplicationEventMulticaster applicationEventMulticaster;
    @Override
    public void refresh(String content, ConfigFileTypeEnum fileTypeEnum) {
        if (StringUtils.isBlank(content) || Objects.isNull(fileTypeEnum)) {
            return;
        }
        try {
            // 根據(jù)配置內(nèi)容和配置類型將配置內(nèi)容轉(zhuǎn)成Map
            val prop = ConfigHandler.getInstance().parseConfig(content, fileTypeEnum);
            doRefresh(prop);
        } catch (IOException e) {
            log.error("DynamicTp refresh error, content: {}, fileType: {}",
                    content, fileTypeEnum, e);
        }
    }
    private void doRefresh(Map<Object, Object> properties) {
        // 將Map中的配置轉(zhuǎn)換成DtpProperties
        ConfigurationPropertySource sources = new MapConfigurationPropertySource(properties);
        Binder binder = new Binder(sources);
        ResolvableType type = ResolvableType.forClass(DtpProperties.class);
        Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);
        binder.bind(MAIN_PROPERTIES_PREFIX, target);
        // 刷新動態(tài)線程池配置
        DtpRegistry.refresh(dtpProperties);
        // 發(fā)布刷新實現(xiàn),該事件用于控制Web容器線程池參數(shù)控制
        publishEvent();
    }
    private void publishEvent() {
        RefreshEvent event = new RefreshEvent(this, dtpProperties);
        applicationEventMulticaster.multicastEvent(event);
    }
}

Zookeeper配置中心接入擴展實現(xiàn)

基于AbstractRefresher就可以實現(xiàn)Zookeeper配置中心的擴展了,Zookeeper的擴展實現(xiàn)繼承AbstractRefresher,Zookeeper的擴展實現(xiàn)只需要監(jiān)聽配置中心的配置變更即可拿到配置內(nèi)容,然后通過refresh刷新配置即可。代碼如下:

ZookeeperRefresher繼承AbstractRefresher,實現(xiàn)InitializingBean,afterPropertiesSet方法邏輯從配置DtpProperties獲取Zookeeper的配置信息,CuratorFrameworkFactory創(chuàng)建客戶端,設(shè)置監(jiān)聽器,這里有兩種監(jiān)聽器,一個是連接監(jiān)聽ConnectionStateListener,一個是節(jié)點變動監(jiān)聽CuratorListener,出發(fā)監(jiān)聽后loadNode負責(zé)從Zookeeper獲取配置文件配置并組裝配置內(nèi)容,然后通過refresh刷新配置,注意,Zookeeper配置目前配置類型僅支持properties。

@Slf4j
public class ZookeeperRefresher extends AbstractRefresher implements InitializingBean {
    @Resource
    private DtpProperties dtpProperties;
    private CuratorFramework curatorFramework;
    @Override
    public void afterPropertiesSet() throws Exception {
        DtpProperties.Zookeeper zookeeper = dtpProperties.getZookeeper();
        curatorFramework = CuratorFrameworkFactory.newClient(zookeeper.getZkConnectStr(),
                new ExponentialBackoffRetry(1000, 3));
        String nodePath = ZKPaths.makePath(ZKPaths.makePath(zookeeper.getRootNode(),
                zookeeper.getConfigVersion()), zookeeper.getNode());
        final ConnectionStateListener connectionStateListener = (client, newState) -> {
            if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) {
                loadNode(nodePath);
            }};
        final CuratorListener curatorListener = (client, curatorEvent) -> {
            final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
            if (null != watchedEvent) {
                switch (watchedEvent.getType()) {
                    case NodeChildrenChanged:
                    case NodeDataChanged:
                        loadNode(nodePath);
                        break;
                    default:
                        break;
                }
            }};
        curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
        curatorFramework.getCuratorListenable().addListener(curatorListener);
        curatorFramework.start();
        log.info("DynamicTp refresher, add listener success, nodePath: {}", nodePath);
    }
    /**
     * load config and refresh
     * @param nodePath config path
     */
    public void loadNode(String nodePath) {
        try {
            final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
            final List<String> children = childrenBuilder.watched().forPath(nodePath);
            StringBuilder content = new StringBuilder();
            children.forEach(c -> {
                String n = ZKPaths.makePath(nodePath, c);
                final String nodeName = ZKPaths.getNodeFromPath(n);
                final GetDataBuilder data = curatorFramework.getData();
                String value = "";
                try {
                    value = new String(data.watched().forPath(n), StandardCharsets.UTF_8);
                } catch (Exception e) {
                    log.error("zk config value watched exception.", e);
                }
                content.append(nodeName).append("=").append(value).append("\n");
            });
            refresh(content.toString(), ConfigFileTypeEnum.PROPERTIES);
        } catch (Exception e) {
            log.error("load zk node error, nodePath is {}", nodePath, e);
        }
    }
}

總結(jié)

dynamic-tp對應(yīng)支持配置中心的擴展能力做的非常好,筆者通過Zookeeper客戶端CuratorFramework設(shè)置監(jiān)聽的方式進行接入,主要監(jiān)聽CuratorFramework客戶端連接建立和斷開的事件和節(jié)點變動的事件實現(xiàn)了動態(tài)線程池參數(shù)的更新。

到此這篇關(guān)于Java動態(tài)線程池插件dynamic-tp集成zookeeper的文章就介紹到這了,更多相關(guān)Java dynamic-tp內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Maven+SSM框架實現(xiàn)簡單的增刪改查

    Maven+SSM框架實現(xiàn)簡單的增刪改查

    這篇文章主要介紹了Maven+SSM框架實現(xiàn)簡單的增刪改查,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-03-03
  • java中的FileReader和FileWriter讀寫流

    java中的FileReader和FileWriter讀寫流

    這篇文章主要介紹了java中的FileReader和FileWriter讀寫流,在java中對數(shù)據(jù)輸入輸出的操作陳作為流我們對不同的文件進行操作,或者對操作文件進行輸入和輸出時所用的流都是不同的,因此在java.io的包下存在很多流的類或者接口提供給我們對應(yīng)的操作,需要的朋友可以參考下
    2023-10-10
  • Java編程多線程之共享數(shù)據(jù)代碼詳解

    Java編程多線程之共享數(shù)據(jù)代碼詳解

    這篇文章主要介紹了Java編程多線程之共享數(shù)據(jù)代碼詳解,分享了相關(guān)代碼示例,小編覺得還是挺不錯的,具有一定借鑒價值,需要的朋友可以參考下
    2018-02-02
  • 調(diào)用Process.waitfor導(dǎo)致的進程掛起問題及解決

    調(diào)用Process.waitfor導(dǎo)致的進程掛起問題及解決

    這篇文章主要介紹了調(diào)用Process.waitfor導(dǎo)致的進程掛起問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Java8 HashMap擴容算法實例解析

    Java8 HashMap擴容算法實例解析

    這篇文章主要介紹了Java8 HashMap擴容算法實例解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-12-12
  • JVM虛擬機查找類文件的順序方法

    JVM虛擬機查找類文件的順序方法

    下面小編就為大家分享一篇JVM虛擬機查找類文件的順序方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-01-01
  • MyBatis如何使用PageHelper實現(xiàn)分頁查詢

    MyBatis如何使用PageHelper實現(xiàn)分頁查詢

    這篇文章主要介紹了MyBatis如何使用PageHelper實現(xiàn)分頁查詢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java中的Cookie和Session詳細解析

    Java中的Cookie和Session詳細解析

    這篇文章主要介紹了Java中的Cookie和Session詳細解析,客戶端會話技術(shù),服務(wù)端給客戶端的數(shù)據(jù),存儲于客戶端(瀏覽器),由于是保存在客戶端上的,所以存在安全問題,需要的朋友可以參考下
    2024-01-01
  • 網(wǎng)絡(luò)爬蟲案例解析

    網(wǎng)絡(luò)爬蟲案例解析

    本文主要介紹了網(wǎng)絡(luò)爬蟲的小案例。具有很好的參考價值。下面跟著小編一起來看下吧
    2017-03-03
  • Java并發(fā)編程之原子操作類詳情

    Java并發(fā)編程之原子操作類詳情

    這篇文章主要介紹了Java并發(fā)編程之原子操作類詳情,文章基于Java并發(fā)編程展開相關(guān)內(nèi)容,具有一定的參考價值,需要的小伙伴可以參考一下
    2022-04-04

最新評論