PowerJob的IdGenerateService工作流程源碼解讀
序
本文主要研究一下PowerJob的IdGenerateService
IdGenerateService
tech/powerjob/server/core/uid/IdGenerateService.java
@Slf4j @Service public class IdGenerateService { private final SnowFlakeIdGenerator snowFlakeIdGenerator; private static final int DATA_CENTER_ID = 0; public IdGenerateService(ServerInfoService serverInfoService) { long id = serverInfoService.fetchServiceInfo().getId(); snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id); log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id); } /** * 分配分布式唯一ID * @return 分布式唯一ID */ public long allocate() { return snowFlakeIdGenerator.nextId(); } }
IdGenerateService的構(gòu)造器接收ServerInfoService,然后通過(guò)serverInfoService.fetchServiceInfo().getId()獲取machineId,最后創(chuàng)建SnowFlakeIdGenerator,其DATA_CENTER_ID為0;其allocate返回的是snowFlakeIdGenerator.nextId()
ServerInfoService
tech/powerjob/server/remote/server/self/ServerInfoService.java
public interface ServerInfoService { /** * fetch current server info * @return ServerInfo */ ServerInfo fetchServiceInfo(); }
ServerInfoService定義了fetchServiceInfo方法,返回ServerInfo
ServerInfoServiceImpl
tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java
@Slf4j @Service public class ServerInfoServiceImpl implements ServerInfoService { private final ServerInfo serverInfo; private final ServerInfoRepository serverInfoRepository; private static final long MAX_SERVER_CLUSTER_SIZE = 10000; private static final String SERVER_INIT_LOCK = "server_init_lock"; private static final int SERVER_INIT_LOCK_MAX_TIME = 15000; @Autowired public ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) { this.serverInfo = new ServerInfo(); String ip = NetUtils.getLocalHost(); serverInfo.setIp(ip); serverInfo.setBornTime(System.currentTimeMillis()); this.serverInfoRepository = serverInfoRepository; Stopwatch sw = Stopwatch.createStarted(); while (!lockService.tryLock(SERVER_INIT_LOCK, SERVER_INIT_LOCK_MAX_TIME)) { log.info("[ServerInfoService] waiting for lock: {}", SERVER_INIT_LOCK); CommonUtils.easySleep(100); } try { // register server then get server_id ServerInfoDO server = serverInfoRepository.findByIp(ip); if (server == null) { ServerInfoDO newServerInfo = new ServerInfoDO(ip); server = serverInfoRepository.saveAndFlush(newServerInfo); } else { serverInfoRepository.updateGmtModifiedByIp(ip, new Date()); } if (server.getId() < MAX_SERVER_CLUSTER_SIZE) { serverInfo.setId(server.getId()); } else { long retryServerId = retryServerId(); serverInfo.setId(retryServerId); serverInfoRepository.updateIdByIp(retryServerId, ip); } } catch (Exception e) { log.error("[ServerInfoService] init server failed", e); throw e; } finally { lockService.unlock(SERVER_INIT_LOCK); } log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw); } @Scheduled(fixedRate = 15000, initialDelay = 15000) public void heartbeat() { serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date()); } private long retryServerId() { List<ServerInfoDO> serverInfoList = serverInfoRepository.findAll(); log.info("[ServerInfoService] current server record num in database: {}", serverInfoList.size()); // clean inactive server record first if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) { // use a large time interval to prevent valid records from being deleted when the local time is inaccurate Date oneDayAgo = DateUtils.addDays(new Date(), -1); int delNum =serverInfoRepository.deleteByGmtModifiedBefore(oneDayAgo); log.warn("[ServerInfoService] delete invalid {} server info record before {}", delNum, oneDayAgo); serverInfoList = serverInfoRepository.findAll(); } if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) { throw new PowerJobException(String.format("The powerjob-server cluster cannot accommodate %d machines, please rebuild another cluster", serverInfoList.size())); } Set<Long> uedServerIds = serverInfoList.stream().map(ServerInfoDO::getId).collect(Collectors.toSet()); for (long i = 1; i <= MAX_SERVER_CLUSTER_SIZE; i++) { if (uedServerIds.contains(i)) { continue; } log.info("[ServerInfoService] ID[{}] is not used yet, try as new server id", i); return i; } throw new PowerJobException("impossible"); } @Autowired(required = false) public void setBuildProperties(BuildProperties buildProperties) { if (buildProperties == null) { return; } String pomVersion = buildProperties.getVersion(); if (StringUtils.isNotBlank(pomVersion)) { serverInfo.setVersion(pomVersion); } } @Override public ServerInfo fetchServiceInfo() { return serverInfo; } }
ServerInfoServiceImpl實(shí)現(xiàn)了ServerInfoService接口,其構(gòu)造器注入lockService和serverInfoRepository,先通過(guò)lockService.tryLock搶到server_init_lock,然后serverInfoRepository.findByIp找到ServerInfoDO執(zhí)行saveAndFlush或者updateGmtModifiedByIp;其fetchServiceInfo返回的是serverInfo信息;它還以fixedRate為15s調(diào)度了heartbeat,主要是更新gmtModifed
SnowFlakeIdGenerator
tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java
public class SnowFlakeIdGenerator { /** * 起始的時(shí)間戳(a special day for me) */ private final static long START_STAMP = 1555776000000L; /** * 序列號(hào)占用的位數(shù) */ private final static long SEQUENCE_BIT = 6; /** * 機(jī)器標(biāo)識(shí)占用的位數(shù) */ private final static long MACHINE_BIT = 14; /** * 數(shù)據(jù)中心占用的位數(shù) */ private final static long DATA_CENTER_BIT = 2; /** * 每一部分的最大值 */ private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT); private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT); private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT); /** * 每一部分向左的位移 */ private final static long MACHINE_LEFT = SEQUENCE_BIT; private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT; /** * 數(shù)據(jù)中心 */ private final long dataCenterId; /** * 機(jī)器標(biāo)識(shí) */ private final long machineId; /** * 序列號(hào) */ private long sequence = 0L; /** * 上一次時(shí)間戳 */ private long lastTimestamp = -1L; public SnowFlakeIdGenerator(long dataCenterId, long machineId) { if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) { throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0"); } if (machineId > MAX_MACHINE_NUM || machineId < 0) { throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); } this.dataCenterId = dataCenterId; this.machineId = machineId; } /** * 產(chǎn)生下一個(gè)ID */ public synchronized long nextId() { long currStamp = getNewStamp(); if (currStamp < lastTimestamp) { return futureId(); } if (currStamp == lastTimestamp) { //相同毫秒內(nèi),序列號(hào)自增 sequence = (sequence + 1) & MAX_SEQUENCE; //同一毫秒的序列數(shù)已經(jīng)達(dá)到最大 if (sequence == 0L) { currStamp = getNextMill(); } } else { //不同毫秒內(nèi),序列號(hào)置為0 sequence = 0L; } lastTimestamp = currStamp; return (currStamp - START_STAMP) << TIMESTAMP_LEFT //時(shí)間戳部分 | dataCenterId << DATA_CENTER_LEFT //數(shù)據(jù)中心部分 | machineId << MACHINE_LEFT //機(jī)器標(biāo)識(shí)部分 | sequence; //序列號(hào)部分 } /** * 發(fā)生時(shí)鐘回?fù)軙r(shí)借用未來(lái)時(shí)間生成Id,避免運(yùn)行過(guò)程中任務(wù)調(diào)度和工作流直接進(jìn)入不可用狀態(tài) * 注:該方式不可解決原算法中停服狀態(tài)下時(shí)鐘回?fù)軐?dǎo)致的重復(fù)id問(wèn)題 */ private long futureId() { sequence = (sequence + 1) & MAX_SEQUENCE; if (sequence == 0L) { lastTimestamp = lastTimestamp + 1; } return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //時(shí)間戳部分 | dataCenterId << DATA_CENTER_LEFT //數(shù)據(jù)中心部分 | machineId << MACHINE_LEFT //機(jī)器標(biāo)識(shí)部分 | sequence; //序列號(hào)部分 } private long getNextMill() { long mill = getNewStamp(); while (mill <= lastTimestamp) { mill = getNewStamp(); } return mill; } private long getNewStamp() { return System.currentTimeMillis(); } }
SnowFlakeIdGenerator的dataCenterId(最大值為3)和machineId(最大值為16383),sequence最大值為63
小結(jié)
PowerJob的IdGenerateService通過(guò)serverInfoService.fetchServiceInfo().getId()獲取machineId,最后創(chuàng)建SnowFlakeIdGenerator,其DATA_CENTER_ID為0;其allocate返回的是snowFlakeIdGenerator.nextId();其InstanceInfoDO的instanceId就是idGenerateService.allocate()生成的。
以上就是PowerJob的IdGenerateService工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob IdGenerateService的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中static和static?final的區(qū)別詳解
這篇文章主要介紹了Java中static和static?final的區(qū)別詳解,開(kāi)發(fā)時(shí)我們經(jīng)常用到static以及static?final來(lái)修飾我們的字段變量,那么他們到底有什么區(qū)別呢?其實(shí)他們的區(qū)別可以用使用字節(jié)碼文件來(lái)解析,需要的朋友可以參考下2023-10-10java通過(guò)模擬post方式提交表單實(shí)現(xiàn)圖片上傳功能實(shí)例
這篇文章主要介紹了java通過(guò)模擬post方式提交表單實(shí)現(xiàn)圖片上傳功能實(shí)例,涉及Java針對(duì)表單的提交操作響應(yīng)及文件傳輸?shù)南嚓P(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-11-11Java8中List轉(zhuǎn)換String字符串幾種方式
這篇文章主要給大家介紹了關(guān)于Java8中List轉(zhuǎn)換String字符串的幾種方式,在實(shí)際開(kāi)發(fā)中經(jīng)常遇到List轉(zhuǎn)為String字符串的情況,文中給出了幾種方法的示例代碼,需要的朋友可以參考下2023-07-07java中如何實(shí)現(xiàn) zip rar 7z 壓縮包解壓
這篇文章主要介紹了java中如何實(shí)現(xiàn) zip rar 7z 壓縮包解壓?jiǎn)栴},具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07獲取Java加載器和類完整結(jié)構(gòu)的方法分享
這篇文章主要為大家詳細(xì)介紹了獲取Java加載器和類完整結(jié)構(gòu)的方法,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)Java有一定的幫助,需要的可以參考一下2022-12-12