PowerJob UseCacheLock工作流程源碼剖析
序
本文主要研究一下PowerJob的UseCacheLock
UseCacheLock
tech/powerjob/server/core/lock/UseCacheLock.java
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseCacheLock {
String type();
String key();
int concurrencyLevel();
}UseCacheLock注解定義了type、key、concurrencyLevel屬性
UseCacheLockAspect
tech/powerjob/server/core/lock/UseCacheLockAspect.java
@Slf4j
@Aspect
@Component
@Order(1)
@RequiredArgsConstructor
public class UseCacheLockAspect {
private final MonitorService monitorService;
private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();
private static final long SLOW_THRESHOLD = 100;
@Around(value = "@annotation(useCacheLock))")
public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {
Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {
int concurrencyLevel = useCacheLock.concurrencyLevel();
log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);
return CacheBuilder.newBuilder()
.initialCapacity(300000)
.maximumSize(500000)
.concurrencyLevel(concurrencyLevel)
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
});
final Method method = AOPUtils.parseMethod(point);
Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);
final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
long start = System.currentTimeMillis();
reentrantLock.lockInterruptibly();
try {
long timeCost = System.currentTimeMillis() - start;
if (timeCost > SLOW_THRESHOLD) {
final SlowLockEvent slowLockEvent = new SlowLockEvent()
.setType(SlowLockEvent.Type.LOCAL)
.setLockType(useCacheLock.type())
.setLockKey(String.valueOf(key))
.setCallerService(method.getDeclaringClass().getSimpleName())
.setCallerMethod(method.getName())
.setCost(timeCost);
monitorService.monitor(slowLockEvent);
log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
key,
JSON.toJSONString(point.getArgs()));
}
return point.proceed();
} finally {
reentrantLock.unlock();
}
}
}UseCacheLockAspect攔截@UseCacheLock注解,它定義了lockContainer維護(hù)了useCacheLock.type()與Cache的關(guān)系;Cache采用的guava的Cache,其initialCapacity為300000,maximumSize為500000,expireAfterWrite為30分鐘;Cache的key為lock key,value為ReentrantLock;其execute方法主要是先執(zhí)行reentrantLock.lockInterruptibly(),然后執(zhí)行point.proceed(),最后reentrantLock.unlock();執(zhí)行point.proceed()之前還判斷了一下加鎖耗時(shí),若超過(guò)SLOW_THRESHOLD(100ms)則通過(guò)monitorService.monitor上報(bào)SlowLockEvent
示例
@UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024)
public void redispatchAsync(Long instanceId, int originStatus) {
// 將狀態(tài)重置為等待派發(fā)
instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
}key支持SpEl
小結(jié)
PowerJob的UseCacheLock注解定義了type、key、concurrencyLevel屬性;UseCacheLockAspect攔截@UseCacheLock注解,它定義了lockContainer維護(hù)了useCacheLock.type()與Cache的關(guān)系;而Cache的key為lock key,value為ReentrantLock,最后是通過(guò)reentrantLock.lockInterruptibly()加鎖。
以上就是PowerJob UseCacheLock工作流程源碼剖析的詳細(xì)內(nèi)容,更多關(guān)于PowerJob UseCacheLock的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java使用PDFRenderer實(shí)現(xiàn)預(yù)覽PDF功能
這篇文章主要為大家詳細(xì)介紹了java使用PDFRenderer實(shí)現(xiàn)預(yù)覽PDF功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-12-12
Java經(jīng)典算法匯總之順序查找(Sequential Search)
Java查找算法之順序查找說(shuō)明:順序查找適合于存儲(chǔ)結(jié)構(gòu)為順序存儲(chǔ)或鏈接存儲(chǔ)的線性表。 下面我們來(lái)詳細(xì)說(shuō)明下2016-04-04
簡(jiǎn)述Java中進(jìn)程與線程的關(guān)系_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
在 Java 語(yǔ)言中,對(duì)進(jìn)程和線程的封裝,分別提供了 Process 和 Thread 相關(guān)的一些類。本文首先簡(jiǎn)單的介紹如何使用這些類來(lái)創(chuàng)建進(jìn)程和線程2017-05-05
Java實(shí)現(xiàn)根據(jù)前端所要格式返回樹(shù)形3級(jí)層級(jí)數(shù)據(jù)
這篇文章主要為大家詳細(xì)介紹了Java如何實(shí)現(xiàn)根據(jù)前端所要格式返回樹(shù)形3級(jí)層級(jí)數(shù)據(jù),文中的示例代碼講解詳細(xì),有需要的小伙伴可以了解下2024-02-02
Java后端學(xué)習(xí)精華之TCP通信傳輸協(xié)議詳解
TCP/IP是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議,它會(huì)保證數(shù)據(jù)不丟包、不亂序。TCP全名是Transmission Control Protocol,它是位于網(wǎng)絡(luò)OSI模型中的第四層2021-09-09

