PowerJob LockService方法工作流程源碼解讀
序
本文主要研究一下PowerJob的LockService
LockService
tech/powerjob/server/extension/LockService.java
public interface LockService {
/**
* 上鎖(獲取鎖),立即返回,不會(huì)阻塞等待鎖
* @param name 鎖名稱
* @param maxLockTime 最長(zhǎng)持有鎖的時(shí)間,單位毫秒(ms)
* @return true -> 獲取到鎖,false -> 未獲取到鎖
*/
boolean tryLock(String name, long maxLockTime);
/**
* 釋放鎖
* @param name 鎖名稱
*/
void unlock(String name);
}LockService接口定義了tryLock、unlock方法
DatabaseLockService
tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java
@Slf4j
@Service
public class DatabaseLockService implements LockService {
private final String ownerIp;
private final OmsLockRepository omsLockRepository;
@Autowired
public DatabaseLockService(OmsLockRepository omsLockRepository) {
this.ownerIp = NetUtils.getLocalHost();
this.omsLockRepository = omsLockRepository;
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
int num = omsLockRepository.deleteByOwnerIP(ownerIp);
log.info("[DatabaseLockService] execute shutdown hook, release all lock(owner={},num={})", ownerIp, num);
}));
}
@Override
public boolean tryLock(String name, long maxLockTime) {
OmsLockDO newLock = new OmsLockDO(name, ownerIp, maxLockTime);
try {
omsLockRepository.saveAndFlush(newLock);
return true;
} catch (DataIntegrityViolationException ignore) {
} catch (Exception e) {
log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
}
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
// 鎖超時(shí),強(qiáng)制釋放鎖并重新嘗試獲取
if (lockedMillions > omsLockDO.getMaxLockTime()) {
log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
unlock(name);
return tryLock(name, maxLockTime);
}
return false;
}
@Override
public void unlock(String name) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
}catch (Exception e) {
log.error("[DatabaseLockService] unlock {} failed.", name, e);
}
}
}DatabaseLockService基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)了LockService,其構(gòu)造器依賴OmsLockRepository,同時(shí)注冊(cè)了ShutdownHook,在關(guān)閉的時(shí)候執(zhí)行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法創(chuàng)建OmsLockDO,然后執(zhí)行omsLockRepository.saveAndFlush,若成功則返回,若有異常則通過(guò)omsLockRepository.findByLockName找到omsLockDO,計(jì)算加鎖時(shí)間,若超過(guò)MaxLockTime則執(zhí)行unlock再重新tryLock;其unlock執(zhí)行omsLockRepository.deleteByLockName
NetUtils.getLocalHost
tech/powerjob/common/utils/NetUtils.java
public static String getLocalHost() {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
String addressFromJVM = System.getProperty(PowerJobDKey.BIND_LOCAL_ADDRESS);
if (StringUtils.isNotEmpty(addressFromJVM)) {
log.info("[Net] use address from[{}]: {}", PowerJobDKey.BIND_LOCAL_ADDRESS, addressFromJVM);
return HOST_ADDRESS = addressFromJVM;
}
InetAddress address = getLocalAddress();
if (address != null) {
return HOST_ADDRESS = address.getHostAddress();
}
return LOCALHOST_VALUE;
}
public static InetAddress getLocalAddress() {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddress0();
LOCAL_ADDRESS = localAddress;
return localAddress;
}
private static InetAddress getLocalAddress0() {
// @since 2.7.6, choose the {@link NetworkInterface} first
try {
InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface());
if (addressOp != null) {
return addressOp;
}
} catch (Throwable e) {
log.warn("[Net] getLocalAddress0 failed.", e);
}
InetAddress localAddress = null;
try {
localAddress = InetAddress.getLocalHost();
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
return addressOp.get();
}
} catch (Throwable e) {
log.warn("[Net] getLocalAddress0 failed.", e);
}
return localAddress;
}NetUtils的getLocalHost先判斷HOST_ADDRESS是否有值,有則直接返回,否則先從系統(tǒng)屬性讀取powerjob.network.local.address,讀取不到則取LOCAL_ADDRESS,若LOCAL_ADDRESS為null則通過(guò)getLocalAddress0獲取
OmsLockDO
tech/powerjob/server/persistence/remote/model/OmsLockDO.java
@Data
@Entity
@NoArgsConstructor
@Table(uniqueConstraints = {@UniqueConstraint(name = "uidx01_oms_lock", columnNames = {"lockName"})})
public class OmsLockDO {
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
private String lockName;
private String ownerIP;
/**
* 最長(zhǎng)持有鎖的時(shí)間
*/
private Long maxLockTime;
private Date gmtCreate;
private Date gmtModified;
public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {
this.lockName = lockName;
this.ownerIP = ownerIP;
this.maxLockTime = maxLockTime;
this.gmtCreate = new Date();
this.gmtModified = this.gmtCreate;
}
}OmsLockDO定義lockName為唯一索引,它還定義了ownerIP、maxLockTime
OmsLockRepository
tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java
public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Modifying
@Transactional(rollbackOn = Exception.class)
@Query(value = "delete from OmsLockDO where lockName = ?1")
int deleteByLockName(String lockName);
OmsLockDO findByLockName(String lockName);
@Modifying
@Transactional(rollbackOn = Exception.class)
int deleteByOwnerIP(String ip);
}OmsLockRepository繼承了JpaRepository,它定義了deleteByLockName、findByLockName、deleteByOwnerIP方法
小結(jié)
LockService接口定義了tryLock、unlock方法;DatabaseLockService基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)了LockService,其構(gòu)造器依賴OmsLockRepository,同時(shí)注冊(cè)了ShutdownHook,在關(guān)閉的時(shí)候執(zhí)行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法創(chuàng)建OmsLockDO,然后執(zhí)行omsLockRepository.saveAndFlush,若成功則返回,若有異常則通過(guò)omsLockRepository.findByLockName找到omsLockDO,計(jì)算加鎖時(shí)間,若超過(guò)MaxLockTime則執(zhí)行unlock再重新tryLock;其unlock執(zhí)行omsLockRepository.deleteByLockName。
以上就是PowerJob LockService方法工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob LockService的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例
這篇文章主要為大家介紹了Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12
mybatisPlus自定義批量新增的實(shí)現(xiàn)代碼
這篇文章主要介紹了mybatisPlus自定義批量新增的實(shí)現(xiàn)代碼,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11
詳解OpenAPI開發(fā)如何動(dòng)態(tài)的添加接口實(shí)現(xiàn)
這篇文章主要為大家介紹了OpenAPI開發(fā)如何動(dòng)態(tài)的添加接口實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
Java Speech API實(shí)現(xiàn)語(yǔ)音識(shí)別
Java語(yǔ)音識(shí)別是一項(xiàng)非常有用的功能,它可以將語(yǔ)音轉(zhuǎn)換為文本,從而實(shí)現(xiàn)語(yǔ)音輸入和語(yǔ)音控制功能,在當(dāng)今數(shù)字化時(shí)代,語(yǔ)音識(shí)別技術(shù)逐漸成為人機(jī)交互的重要方式之一,語(yǔ)音識(shí)別技術(shù)可以幫助我們將語(yǔ)音數(shù)據(jù)轉(zhuǎn)化為文字,進(jìn)而進(jìn)行后續(xù)的處理和分析2023-10-10
Java使用Lettuce客戶端在Redis在主從復(fù)制模式下命令執(zhí)行的操作
這篇文章主要介紹了Java使用Lettuce客戶端在Redis在主從復(fù)制模式下命令執(zhí)行的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-04-04
java實(shí)現(xiàn)Runnable接口適合資源的共享
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)Runnable接口適合資源的共享,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07
mybatis 自定義實(shí)現(xiàn)攔截器插件Interceptor示例
這篇文章主要介紹了mybatis 自定義實(shí)現(xiàn)攔截器插件Interceptor,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
Redis Java Lettuce驅(qū)動(dòng)框架原理解析
這篇文章主要介紹了Redis Java Lettuce驅(qū)動(dòng)框架原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-12-12
MyBatis?Generator?ORM層面的代碼自動(dòng)生成器(推薦)
Mybatis?Generator是一個(gè)專門為?MyBatis和?ibatis框架使用者提供的代碼生成器,也可以快速的根據(jù)數(shù)據(jù)表生成對(duì)應(yīng)的pojo類、Mapper接口、Mapper文件,甚至生成QBC風(fēng)格的查詢對(duì)象,這篇文章主要介紹了MyBatis?Generator?ORM層面的代碼自動(dòng)生成器,需要的朋友可以參考下2023-01-01
springboot集成mqtt超級(jí)詳細(xì)步驟
這篇文章主要介紹了springboot集成mqtt超級(jí)詳細(xì)步驟,本文分步驟結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-06-06

