PowerJob的IdGenerateService工作流程源碼解讀
序
本文主要研究一下PowerJob的IdGenerateService
IdGenerateService
tech/powerjob/server/core/uid/IdGenerateService.java
@Slf4j
@Service
public class IdGenerateService {
private final SnowFlakeIdGenerator snowFlakeIdGenerator;
private static final int DATA_CENTER_ID = 0;
public IdGenerateService(ServerInfoService serverInfoService) {
long id = serverInfoService.fetchServiceInfo().getId();
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);
}
/**
* 分配分布式唯一ID
* @return 分布式唯一ID
*/
public long allocate() {
return snowFlakeIdGenerator.nextId();
}
}IdGenerateService的構(gòu)造器接收ServerInfoService,然后通過serverInfoService.fetchServiceInfo().getId()獲取machineId,最后創(chuàng)建SnowFlakeIdGenerator,其DATA_CENTER_ID為0;其allocate返回的是snowFlakeIdGenerator.nextId()
ServerInfoService
tech/powerjob/server/remote/server/self/ServerInfoService.java
public interface ServerInfoService {
/**
* fetch current server info
* @return ServerInfo
*/
ServerInfo fetchServiceInfo();
}ServerInfoService定義了fetchServiceInfo方法,返回ServerInfo
ServerInfoServiceImpl
tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java
@Slf4j
@Service
public class ServerInfoServiceImpl implements ServerInfoService {
private final ServerInfo serverInfo;
private final ServerInfoRepository serverInfoRepository;
private static final long MAX_SERVER_CLUSTER_SIZE = 10000;
private static final String SERVER_INIT_LOCK = "server_init_lock";
private static final int SERVER_INIT_LOCK_MAX_TIME = 15000;
@Autowired
public ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) {
this.serverInfo = new ServerInfo();
String ip = NetUtils.getLocalHost();
serverInfo.setIp(ip);
serverInfo.setBornTime(System.currentTimeMillis());
this.serverInfoRepository = serverInfoRepository;
Stopwatch sw = Stopwatch.createStarted();
while (!lockService.tryLock(SERVER_INIT_LOCK, SERVER_INIT_LOCK_MAX_TIME)) {
log.info("[ServerInfoService] waiting for lock: {}", SERVER_INIT_LOCK);
CommonUtils.easySleep(100);
}
try {
// register server then get server_id
ServerInfoDO server = serverInfoRepository.findByIp(ip);
if (server == null) {
ServerInfoDO newServerInfo = new ServerInfoDO(ip);
server = serverInfoRepository.saveAndFlush(newServerInfo);
} else {
serverInfoRepository.updateGmtModifiedByIp(ip, new Date());
}
if (server.getId() < MAX_SERVER_CLUSTER_SIZE) {
serverInfo.setId(server.getId());
} else {
long retryServerId = retryServerId();
serverInfo.setId(retryServerId);
serverInfoRepository.updateIdByIp(retryServerId, ip);
}
} catch (Exception e) {
log.error("[ServerInfoService] init server failed", e);
throw e;
} finally {
lockService.unlock(SERVER_INIT_LOCK);
}
log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw);
}
@Scheduled(fixedRate = 15000, initialDelay = 15000)
public void heartbeat() {
serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date());
}
private long retryServerId() {
List<ServerInfoDO> serverInfoList = serverInfoRepository.findAll();
log.info("[ServerInfoService] current server record num in database: {}", serverInfoList.size());
// clean inactive server record first
if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {
// use a large time interval to prevent valid records from being deleted when the local time is inaccurate
Date oneDayAgo = DateUtils.addDays(new Date(), -1);
int delNum =serverInfoRepository.deleteByGmtModifiedBefore(oneDayAgo);
log.warn("[ServerInfoService] delete invalid {} server info record before {}", delNum, oneDayAgo);
serverInfoList = serverInfoRepository.findAll();
}
if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {
throw new PowerJobException(String.format("The powerjob-server cluster cannot accommodate %d machines, please rebuild another cluster", serverInfoList.size()));
}
Set<Long> uedServerIds = serverInfoList.stream().map(ServerInfoDO::getId).collect(Collectors.toSet());
for (long i = 1; i <= MAX_SERVER_CLUSTER_SIZE; i++) {
if (uedServerIds.contains(i)) {
continue;
}
log.info("[ServerInfoService] ID[{}] is not used yet, try as new server id", i);
return i;
}
throw new PowerJobException("impossible");
}
@Autowired(required = false)
public void setBuildProperties(BuildProperties buildProperties) {
if (buildProperties == null) {
return;
}
String pomVersion = buildProperties.getVersion();
if (StringUtils.isNotBlank(pomVersion)) {
serverInfo.setVersion(pomVersion);
}
}
@Override
public ServerInfo fetchServiceInfo() {
return serverInfo;
}
}ServerInfoServiceImpl實現(xiàn)了ServerInfoService接口,其構(gòu)造器注入lockService和serverInfoRepository,先通過lockService.tryLock搶到server_init_lock,然后serverInfoRepository.findByIp找到ServerInfoDO執(zhí)行saveAndFlush或者updateGmtModifiedByIp;其fetchServiceInfo返回的是serverInfo信息;它還以fixedRate為15s調(diào)度了heartbeat,主要是更新gmtModifed
SnowFlakeIdGenerator
tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java
public class SnowFlakeIdGenerator {
/**
* 起始的時間戳(a special day for me)
*/
private final static long START_STAMP = 1555776000000L;
/**
* 序列號占用的位數(shù)
*/
private final static long SEQUENCE_BIT = 6;
/**
* 機器標識占用的位數(shù)
*/
private final static long MACHINE_BIT = 14;
/**
* 數(shù)據(jù)中心占用的位數(shù)
*/
private final static long DATA_CENTER_BIT = 2;
/**
* 每一部分的最大值
*/
private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
/**
* 數(shù)據(jù)中心
*/
private final long dataCenterId;
/**
* 機器標識
*/
private final long machineId;
/**
* 序列號
*/
private long sequence = 0L;
/**
* 上一次時間戳
*/
private long lastTimestamp = -1L;
public SnowFlakeIdGenerator(long dataCenterId, long machineId) {
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.dataCenterId = dataCenterId;
this.machineId = machineId;
}
/**
* 產(chǎn)生下一個ID
*/
public synchronized long nextId() {
long currStamp = getNewStamp();
if (currStamp < lastTimestamp) {
return futureId();
}
if (currStamp == lastTimestamp) {
//相同毫秒內(nèi),序列號自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列數(shù)已經(jīng)達到最大
if (sequence == 0L) {
currStamp = getNextMill();
}
} else {
//不同毫秒內(nèi),序列號置為0
sequence = 0L;
}
lastTimestamp = currStamp;
return (currStamp - START_STAMP) << TIMESTAMP_LEFT //時間戳部分
| dataCenterId << DATA_CENTER_LEFT //數(shù)據(jù)中心部分
| machineId << MACHINE_LEFT //機器標識部分
| sequence; //序列號部分
}
/**
* 發(fā)生時鐘回撥時借用未來時間生成Id,避免運行過程中任務(wù)調(diào)度和工作流直接進入不可用狀態(tài)
* 注:該方式不可解決原算法中停服狀態(tài)下時鐘回撥導(dǎo)致的重復(fù)id問題
*/
private long futureId() {
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0L) {
lastTimestamp = lastTimestamp + 1;
}
return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //時間戳部分
| dataCenterId << DATA_CENTER_LEFT //數(shù)據(jù)中心部分
| machineId << MACHINE_LEFT //機器標識部分
| sequence; //序列號部分
}
private long getNextMill() {
long mill = getNewStamp();
while (mill <= lastTimestamp) {
mill = getNewStamp();
}
return mill;
}
private long getNewStamp() {
return System.currentTimeMillis();
}
}SnowFlakeIdGenerator的dataCenterId(最大值為3)和machineId(最大值為16383),sequence最大值為63
小結(jié)
PowerJob的IdGenerateService通過serverInfoService.fetchServiceInfo().getId()獲取machineId,最后創(chuàng)建SnowFlakeIdGenerator,其DATA_CENTER_ID為0;其allocate返回的是snowFlakeIdGenerator.nextId();其InstanceInfoDO的instanceId就是idGenerateService.allocate()生成的。
以上就是PowerJob的IdGenerateService工作流程源碼解讀的詳細內(nèi)容,更多關(guān)于PowerJob IdGenerateService的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中static和static?final的區(qū)別詳解
這篇文章主要介紹了Java中static和static?final的區(qū)別詳解,開發(fā)時我們經(jīng)常用到static以及static?final來修飾我們的字段變量,那么他們到底有什么區(qū)別呢?其實他們的區(qū)別可以用使用字節(jié)碼文件來解析,需要的朋友可以參考下2023-10-10
java通過模擬post方式提交表單實現(xiàn)圖片上傳功能實例
這篇文章主要介紹了java通過模擬post方式提交表單實現(xiàn)圖片上傳功能實例,涉及Java針對表單的提交操作響應(yīng)及文件傳輸?shù)南嚓P(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-11-11
Java8中List轉(zhuǎn)換String字符串幾種方式
這篇文章主要給大家介紹了關(guān)于Java8中List轉(zhuǎn)換String字符串的幾種方式,在實際開發(fā)中經(jīng)常遇到List轉(zhuǎn)為String字符串的情況,文中給出了幾種方法的示例代碼,需要的朋友可以參考下2023-07-07
java中如何實現(xiàn) zip rar 7z 壓縮包解壓
這篇文章主要介紹了java中如何實現(xiàn) zip rar 7z 壓縮包解壓問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07
獲取Java加載器和類完整結(jié)構(gòu)的方法分享
這篇文章主要為大家詳細介紹了獲取Java加載器和類完整結(jié)構(gòu)的方法,文中的示例代碼講解詳細,對我們學(xué)習(xí)Java有一定的幫助,需要的可以參考一下2022-12-12

