Java?Zookeeper分布式分片算法超詳細講解流程
背景
公司的一個服務需要做類似于分片的邏輯,一開始服務基于傳統(tǒng)部署方式通過本地配置文件配置的方式就可以指定該機器服務的分片內容如:0,1,2,3,隨著系統(tǒng)的升級迭代,該服務進行了容器化部署,所以原來基于本地配置文件各自配置分片數(shù)據(jù)的方式就不適用了,原來的部署方式使得服務是有狀態(tài),是一種非云原生的方式,所以該服務要重新設計實現(xiàn)一套分布式服務分片邏輯。
技術方案
分布式協(xié)調中間件
要實現(xiàn)分布式服務分片的能力,需要有一個分布式中間件,如:Redis,Mysql,Zookeeper等等都可以,我們選用Zookeeper。
基于Zookeeper的技術方案
使用Zookeeper主要是基于Zookeeper的臨時節(jié)點和節(jié)點變化監(jiān)聽機制,具體的技術設計如下:
服務注冊目錄設計
Zookeeper的數(shù)據(jù)存儲結構類似于目錄,服務注冊后的目錄類似如下結構:
解釋下該目錄結構,首先/xxxx/xxxx/sharding是區(qū)別于其他業(yè)務的的目錄,該目錄節(jié)點是持久的,service是服務目錄,標識一個服務,該節(jié)點也是持久的,ip1,ip2是該服務注冊到Zookeeper的機器列表節(jié)點,該節(jié)點是臨時節(jié)點。
/xxxx/xxxx/sharding/service/ip1
-----|----|--------|-------/ip2
服務分片處理流程
- 服務啟動,創(chuàng)建
CuratorFramework客戶端,設置客戶端連接狀態(tài)監(jiān)聽; - 向
Zookeeper注冊該機器的信息,這里設計簡單,機器信息就是ip地址; - 注冊機器信息后,從
Zookeeper獲取所有注冊信息; - 根據(jù)
Zookeeper獲取的所有注冊機器信息根據(jù)分片算法進行分片計算。
編碼實現(xiàn)
ZookeeperConfig
Zookeeper的配置信息
@Data
public class ZookeeperConfig {
/**
* zk集群地址
*/
private String zkAddress;
/**
* 注冊服務目錄
*/
private String nodePath;
/**
* 分片的服務名
*/
private String serviceName;
/**
* 分片總數(shù)
*/
private Integer shardingCount;
public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) {
this.zkAddress = zkAddress;
this.nodePath = nodePath;
this.serviceName = "/" + serviceName;
this.shardingCount = shardingCount;
}
/**
* 等待重試的間隔時間的初始值.
* 單位毫秒.
*/
private int baseSleepTimeMilliseconds = 1000;
/**
* 等待重試的間隔時間的最大值.
* 單位毫秒.
*/
private int maxSleepTimeMilliseconds = 3000;
/**
* 最大重試次數(shù).
*/
private int maxRetries = 3;
/**
* 會話超時時間.
* 單位毫秒.
*/
private int sessionTimeoutMilliseconds;
/**
* 連接超時時間.
* 單位毫秒.
*/
private int connectionTimeoutMilliseconds;
}
InstanceInfo注冊機器
@AllArgsConstructor
@EqualsAndHashCode()
public class InstanceInfo {
private String ip;
public String getInstance() {
return ip;
}
}
ZookeeperShardingService分片服務
@Slf4j
public class ZookeeperShardingService {
public final Map<String, List<Integer>> caches = new HashMap<>(16);
private final CuratorFramework client;
private final ZookeeperConfig zkConfig;
private final ShardingStrategy shardingStrategy;
private final InstanceInfo instanceInfo;
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) {
this.zkConfig = zkConfig;
log.info("開始初始化zk, ip列表是: {}.", zkConfig.getZkAddress());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getZkAddress())
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()));
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
}
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
}
this.shardingStrategy = shardingStrategy;
HostInfo host = new HostInfo();
this.instanceInfo = new InstanceInfo(host.getAddress());
client = builder.build();
client.getConnectionStateListenable().addListener(new ConnectionListener());
client.start();
try {
COUNT_DOWN_LATCH.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 注冊服務節(jié)點監(jiān)聽
registerPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener());
try {
if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
} catch (final Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
}
/**
* 子節(jié)點監(jiān)聽器
* @param nodePath 主節(jié)點
* @param listener 監(jiān)聽器
*/
private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) {
try {
// 1. 創(chuàng)建一個PathChildrenCache
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);
// 2. 添加目錄監(jiān)聽器
pathChildrenCache.getListenable().addListener(listener);
// 3. 啟動監(jiān)聽器
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
log.error("注冊子目錄監(jiān)聽器出現(xiàn)異常,nodePath:{}",nodePath,e);
throw new RuntimeException(e);
}
}
/**
* 服務啟動,注冊zk節(jié)點
* @throws Exception 異常
*/
private void zkOp() throws Exception {
// 是否存在ruubypay-sharding主節(jié)點
if (null == client.checkExists().forPath(zkConfig.getNodePath())) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes());
}
// 是否存服務主節(jié)點
if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) {
// 創(chuàng)建服務主節(jié)點
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
}
// 檢查是否存在臨時節(jié)點
if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) {
System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance());
// 創(chuàng)建臨時節(jié)點
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() +
"/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8));
}
shardingFromZk();
}
/**
* 從zk獲取機器列表并進行分片
* @throws Exception 異常
*/
private void shardingFromZk() throws Exception {
// 從 serviceName 節(jié)點下獲取所有Ip列表
final GetChildrenBuilder childrenBuilder = client.getChildren();
final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
List<InstanceInfo> res = new ArrayList<>();
instanceList.forEach(s -> {
res.add(new InstanceInfo(s));
});
Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount());
// 先清一遍緩存
caches.clear();
shardingResult.forEach((k, v) -> {
caches.put(k.getInstance().split("-")[0], v);
});
}
/**
* zk連接監(jiān)聽
*/
private class ConnectionListener implements ConnectionStateListener {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) {
try {
zkOp();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
COUNT_DOWN_LATCH.countDown();
}
}
}
}
/**
* 子節(jié)點監(jiān)聽
*/
private class ChildrenPathListener implements PathChildrenCacheListener {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
PathChildrenCacheEvent.Type type = event.getType();
if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) {
try {
shardingFromZk();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
}分片算法
采用平均分配的算法
public interface ShardingStrategy {
Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount);
}
public class AverageAllocationShardingStrategy implements ShardingStrategy {
@Override
public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) {
if (list.isEmpty()) {
return null;
}
Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount);
addAliquant(list, shardingCount, result);
return result;
}
private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) {
Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
int itemCountPerSharding = shardingTotalCount / instanceInfos.size();
int count = 0;
for (InstanceInfo each : instanceInfos) {
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) {
int aliquant = shardingTotalCount % instanceInfos.size();
int count = 0;
for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count);
}
count++;
}
}
}總結
基于Zookeeper和簡單的平均分配算法實現(xiàn)了一個簡單的分布式分片服務,該分片服務目前滿足公司需求,因為其簡單,所以不一定滿足其他場景,針對其他場景還需考慮其他因素,該示例供參考。
到此這篇關于Java Zookeeper分布式分片算法超詳細講解流程的文章就介紹到這了,更多相關Java Zookeeper分布式分片算法內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
JavaWeb中struts2實現(xiàn)文件上傳下載功能實例解析
這篇文章主要介紹了JavaWeb中struts2文件上傳下載功能的實現(xiàn),在Web應用系統(tǒng)開發(fā)中,文件上傳和下載功能是非常常用的功能,需要的朋友可以參考下2016-05-05
springboot集成mybatis?plus和dynamic-datasource注意事項說明
這篇文章主要介紹了springboot集成mybatis?plus和dynamic-datasource注意事項說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
Spring中@Controller和@RestController的區(qū)別詳解
這篇文章主要介紹了Spring中@Controller和@RestController的區(qū)別詳解,@RestController?是?@Controller?和?@ResponseBody?的結合體,單獨使用?@RestController?的效果與?@Controller?和?@ResponseBody?二者同時使用的效果相同,需要的朋友可以參考下2023-10-10

