欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java?Zookeeper分布式分片算法超詳細講解流程

 更新時間:2023年03月01日 10:14:11   作者:Redick01  
ZooKeeper是一個分布式的,開放源碼的分布式應用程序協(xié)調(diào)服務,是Google的Chubby一個開源的實現(xiàn),是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等

背景

公司的一個服務需要做類似于分片的邏輯,一開始服務基于傳統(tǒng)部署方式通過本地配置文件配置的方式就可以指定該機器服務的分片內(nèi)容如:0,1,2,3,隨著系統(tǒng)的升級迭代,該服務進行了容器化部署,所以原來基于本地配置文件各自配置分片數(shù)據(jù)的方式就不適用了,原來的部署方式使得服務是有狀態(tài),是一種非云原生的方式,所以該服務要重新設計實現(xiàn)一套分布式服務分片邏輯。

技術方案

分布式協(xié)調(diào)中間件

要實現(xiàn)分布式服務分片的能力,需要有一個分布式中間件,如:RedisMysql,Zookeeper等等都可以,我們選用Zookeeper。

基于Zookeeper的技術方案

使用Zookeeper主要是基于Zookeeper的臨時節(jié)點和節(jié)點變化監(jiān)聽機制,具體的技術設計如下:

服務注冊目錄設計

Zookeeper的數(shù)據(jù)存儲結構類似于目錄,服務注冊后的目錄類似如下結構:

解釋下該目錄結構,首先/xxxx/xxxx/sharding是區(qū)別于其他業(yè)務的的目錄,該目錄節(jié)點是持久的,service是服務目錄,標識一個服務,該節(jié)點也是持久的,ip1,ip2是該服務注冊到Zookeeper的機器列表節(jié)點,該節(jié)點是臨時節(jié)點。

/xxxx/xxxx/sharding/service/ip1
-----|----|--------|-------/ip2

服務分片處理流程

  • 服務啟動,創(chuàng)建CuratorFramework客戶端,設置客戶端連接狀態(tài)監(jiān)聽;
  • Zookeeper注冊該機器的信息,這里設計簡單,機器信息就是ip地址;
  • 注冊機器信息后,從Zookeeper獲取所有注冊信息;
  • 根據(jù)Zookeeper獲取的所有注冊機器信息根據(jù)分片算法進行分片計算。

編碼實現(xiàn)

ZookeeperConfig

Zookeeper的配置信息

@Data
public class ZookeeperConfig {
    /**
     * zk集群地址
     */
    private String zkAddress;
    /**
     * 注冊服務目錄
     */
    private String nodePath;
    /**
     * 分片的服務名
     */
    private String serviceName;
    /**
     * 分片總數(shù)
     */
    private Integer shardingCount;
    public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) {
        this.zkAddress = zkAddress;
        this.nodePath = nodePath;
        this.serviceName = "/" + serviceName;
        this.shardingCount = shardingCount;
    }
    /**
     * 等待重試的間隔時間的初始值.
     * 單位毫秒.
     */
    private int baseSleepTimeMilliseconds = 1000;
    /**
     * 等待重試的間隔時間的最大值.
     * 單位毫秒.
     */
    private int maxSleepTimeMilliseconds = 3000;
    /**
     * 最大重試次數(shù).
     */
    private int maxRetries = 3;
    /**
     * 會話超時時間.
     * 單位毫秒.
     */
    private int sessionTimeoutMilliseconds;
    /**
     * 連接超時時間.
     * 單位毫秒.
     */
    private int connectionTimeoutMilliseconds;
}

InstanceInfo注冊機器

@AllArgsConstructor
@EqualsAndHashCode()
public class InstanceInfo {
    private String ip;
    public String getInstance() {
        return ip;
    }
}

ZookeeperShardingService分片服務

@Slf4j
public class ZookeeperShardingService {
    public final Map<String, List<Integer>> caches = new HashMap<>(16);
    private final CuratorFramework client;
    private final ZookeeperConfig zkConfig;
    private final ShardingStrategy shardingStrategy;
    private final InstanceInfo instanceInfo;
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
    public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) {
        this.zkConfig = zkConfig;
        log.info("開始初始化zk, ip列表是: {}.", zkConfig.getZkAddress());
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(zkConfig.getZkAddress())
                .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()));
        if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
            builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
        }
        if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
            builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
        }
        this.shardingStrategy = shardingStrategy;
        HostInfo host = new HostInfo();
        this.instanceInfo = new InstanceInfo(host.getAddress());
        client = builder.build();
        client.getConnectionStateListenable().addListener(new ConnectionListener());
        client.start();
        try {
            COUNT_DOWN_LATCH.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 注冊服務節(jié)點監(jiān)聽
        registerPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener());
        try {
            if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                client.close();
                throw new KeeperException.OperationTimeoutException();
            }
        } catch (final Exception ex) {
            ex.printStackTrace();
            throw new RuntimeException(ex);
        }
    }
    /**
     * 子節(jié)點監(jiān)聽器
     * @param nodePath 主節(jié)點
     * @param listener 監(jiān)聽器
     */
    private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) {
        try {
            // 1. 創(chuàng)建一個PathChildrenCache
            PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);
            // 2. 添加目錄監(jiān)聽器
            pathChildrenCache.getListenable().addListener(listener);
            // 3. 啟動監(jiān)聽器
            pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch (Exception e) {
            log.error("注冊子目錄監(jiān)聽器出現(xiàn)異常,nodePath:{}",nodePath,e);
            throw new RuntimeException(e);
        }
    }
    /**
     * 服務啟動,注冊zk節(jié)點
     * @throws Exception 異常
     */
    private void zkOp() throws Exception {
        // 是否存在ruubypay-sharding主節(jié)點
        if (null == client.checkExists().forPath(zkConfig.getNodePath())) {
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes());
        }
        // 是否存服務主節(jié)點
        if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) {
            // 創(chuàng)建服務主節(jié)點
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
        }
        // 檢查是否存在臨時節(jié)點
        if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) {
            System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() +  "/" + instanceInfo.getInstance());
            // 創(chuàng)建臨時節(jié)點
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() +
                    "/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8));
        }
        shardingFromZk();
    }
    /**
     * 從zk獲取機器列表并進行分片
     * @throws Exception 異常
     */
    private void shardingFromZk() throws Exception {
        // 從 serviceName 節(jié)點下獲取所有Ip列表
        final GetChildrenBuilder childrenBuilder = client.getChildren();
        final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
        List<InstanceInfo> res = new ArrayList<>();
        instanceList.forEach(s -> {
            res.add(new InstanceInfo(s));
        });
        Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount());
        // 先清一遍緩存
        caches.clear();
        shardingResult.forEach((k, v) -> {
            caches.put(k.getInstance().split("-")[0], v);
        });
    }
    /**
     * zk連接監(jiān)聽
     */
    private class ConnectionListener implements ConnectionStateListener {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) {
                try {
                    zkOp();
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                } finally {
                    COUNT_DOWN_LATCH.countDown();
                }
            }
        }
    }
    /**
     * 子節(jié)點監(jiān)聽
     */
    private class ChildrenPathListener implements PathChildrenCacheListener {
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
            PathChildrenCacheEvent.Type type = event.getType();
            if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) {
                try {
                    shardingFromZk();
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

分片算法

采用平均分配的算法

public interface ShardingStrategy {
    Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount);
}
public class AverageAllocationShardingStrategy implements ShardingStrategy {
    @Override
    public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) {
        if (list.isEmpty()) {
            return null;
        }
        Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount);
        addAliquant(list, shardingCount, result);
        return result;
    }
    private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) {
        Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
        int itemCountPerSharding = shardingTotalCount / instanceInfos.size();
        int count = 0;
        for (InstanceInfo each : instanceInfos) {
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(i);
            }
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }
    private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) {
        int aliquant = shardingTotalCount % instanceInfos.size();
        int count = 0;
        for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count);
            }
            count++;
        }
    }
}

總結

基于Zookeeper和簡單的平均分配算法實現(xiàn)了一個簡單的分布式分片服務,該分片服務目前滿足公司需求,因為其簡單,所以不一定滿足其他場景,針對其他場景還需考慮其他因素,該示例供參考。

到此這篇關于Java Zookeeper分布式分片算法超詳細講解流程的文章就介紹到這了,更多相關Java Zookeeper分布式分片算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • java實現(xiàn)的AES加密算法完整實例

    java實現(xiàn)的AES加密算法完整實例

    這篇文章主要介紹了java實現(xiàn)的AES加密算法,結合完整實例形式分析了AES加密類的實現(xiàn)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2016-07-07
  • JavaWeb中struts2實現(xiàn)文件上傳下載功能實例解析

    JavaWeb中struts2實現(xiàn)文件上傳下載功能實例解析

    這篇文章主要介紹了JavaWeb中struts2文件上傳下載功能的實現(xiàn),在Web應用系統(tǒng)開發(fā)中,文件上傳和下載功能是非常常用的功能,需要的朋友可以參考下
    2016-05-05
  • idea進程結束但是項目頁面正常運行怎么辦

    idea進程結束但是項目頁面正常運行怎么辦

    這篇文章主要介紹了idea進程結束但是項目頁面正常運行怎么辦,很多朋友遇到這樣的情況不知道該如何解決了,下面小編給大家?guī)砹薸dea進程結束但是項目頁面正常運行的解決方法,需要的朋友可以參考下
    2023-03-03
  • Java集合之CopyOnWriteArrayList詳解

    Java集合之CopyOnWriteArrayList詳解

    這篇文章主要介紹了Java集合之CopyOnWriteArrayList詳解,CopyOnWriteArrayList是ArrayList的線程安全版本,內(nèi)部也是通過數(shù)組實現(xiàn),每次對數(shù)組的修改都完全拷貝一份新的數(shù)組來修改,修改完了再替換掉老數(shù)組,這樣保證了只阻塞寫操作,需要的朋友可以參考下
    2023-12-12
  • SpringBoot實現(xiàn)國際化的操作步驟

    SpringBoot實現(xiàn)國際化的操作步驟

    國際化(Internationalization) 是指為了適應不同語言、文化和地區(qū)的用戶,使軟件能夠方便地進行本地化修改的過程,本文介紹了SpringBoot 國際化功能的簡單使用,感興趣的朋友可以參考下
    2024-02-02
  • springboot集成mybatis?plus和dynamic-datasource注意事項說明

    springboot集成mybatis?plus和dynamic-datasource注意事項說明

    這篇文章主要介紹了springboot集成mybatis?plus和dynamic-datasource注意事項說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • jackson序列化和反序列化的應用實踐指南

    jackson序列化和反序列化的應用實踐指南

    這篇文章主要給大家介紹了關于jackson序列化和反序列化的應用實踐指南,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-09-09
  • 分析JAVA中幾種常用的RPC框架

    分析JAVA中幾種常用的RPC框架

    這篇文章主要介紹了JAVA中幾種常用的RPC框架的相關知識點,對此有興趣的朋友參考學習下吧。
    2018-03-03
  • Spring中@Controller和@RestController的區(qū)別詳解

    Spring中@Controller和@RestController的區(qū)別詳解

    這篇文章主要介紹了Spring中@Controller和@RestController的區(qū)別詳解,@RestController?是?@Controller?和?@ResponseBody?的結合體,單獨使用?@RestController?的效果與?@Controller?和?@ResponseBody?二者同時使用的效果相同,需要的朋友可以參考下
    2023-10-10
  • spring初始化源碼之關鍵類和擴展接口詳解

    spring初始化源碼之關鍵類和擴展接口詳解

    Spring就是一個大工廠,可以將所有對象的創(chuàng)建和依賴關系的維護交給Spring管理,下面這篇文章主要給大家介紹了關于spring初始化源碼之關鍵類和擴展接口的相關資料,需要的朋友可以參考下
    2023-04-04

最新評論