分布式鎖redisson實(shí)現(xiàn)原理源碼詳解
一、簡(jiǎn)介
? 現(xiàn)在項(xiàng)目一般都是使用分布式集群部署,對(duì)后臺(tái)業(yè)務(wù)數(shù)據(jù)的某些操作需要考慮加鎖的問題,而jdk的synchronize加鎖機(jī)制已經(jīng)不適合做集群部署的操作,因?yàn)閟ynchronize關(guān)鍵字只是針對(duì)于單體部署的單臺(tái)虛擬機(jī)有用。考慮到現(xiàn)在系統(tǒng)使用redis做緩存比較高效,此處推薦使用redis下的分布式鎖redisson進(jìn)行加鎖操作。官網(wǎng)參考:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95。
二、工程引入配置
1.工程中需要引入redis、redisson依賴,pom.xml中引入:
<!--redis依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.3.2.RELEASE</version> </dependency> <!--redisson依賴--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.4</version> </dependency> <!--使用redis時(shí)需要此jar包--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
2.配置文件yml中添加redis連接信息。
spring: redis: database: 0 host: xx.xx.xx.xx port: 1316 password: xxxx timeout: 3000 lettuce: pool: max-active: 20 max-idle: 10 max-wait: -1 min-idle: 0
3.操作redis的客戶端選擇RedisTemplate,需要配置下存儲(chǔ)在redis的序列化值,使用@Bean注解,當(dāng)程序啟動(dòng)時(shí)加載到spring容器中供后期使用,redis的相關(guān)操作,不在此處進(jìn)行,有需求的可以參考筆者的這篇博文:http://www.dbjr.com.cn/article/220998.htm。
@Configuration public class RedisConfig { Logger logger = LoggerFactory.getLogger(RedisConfig.class); @Bean public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory) { logger.debug("redisTemplate實(shí)例化 {}"); RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(factory); FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer<Object>(Object.class); // key的序列化采用StringRedisSerializer redisTemplate.setKeySerializer(new StringRedisSerializer()); // value值的序列化采用fastJsonRedisSerializer redisTemplate.setValueSerializer(fastJsonRedisSerializer); // hash的key也采用String的序列化方式 redisTemplate.setHashKeySerializer(new StringRedisSerializer()); // hash的value序列化方式采用fastJsonRedisSerializer redisTemplate.setHashValueSerializer(fastJsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
4.配置RedissonClient客戶端,用于加鎖操作,使用@Bean注解,當(dāng)程序啟動(dòng)時(shí)加載到spring容器中供后期使用,配置客戶端需要根據(jù)redis服務(wù)的模式配置,有集群、主從、哨兵等模式,具體配置參考官網(wǎng):https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95;此處使用的單節(jié)點(diǎn)模式配置。
@Configuration public class RedissonConfig { //redis相關(guān)配置 @Value("${spring.redis.host}") private String redisHost; @Value("${spring.redis.port}") private String redisPort; @Value("${spring.redis.database}") private int database; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.timeout}") private int timeout; //創(chuàng)建redisson客戶端,此時(shí)默認(rèn)使用單節(jié)點(diǎn) @Bean public RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer().setAddress("redis://"+redisHost+":"+redisPort); config.useSingleServer().setDatabase(database); config.useSingleServer().setPassword(password); config.useSingleServer().setTimeout(timeout); RedissonClient redisson = Redisson.create(config); return redisson; } }
三、加鎖操作
? 操作特別簡(jiǎn)單,通過RedissonClient獲取鎖,然后調(diào)用lock即可加鎖,解鎖使用unlock即可。
//在需要使用分布式鎖的類里面注入RedissonClient客戶端 @Autowired RedissonClient redissonClient; //根據(jù)鎖名稱獲取鎖 RLock lock = redissonClient.getLock("anyLock"); //加鎖 // 最常見的使用方法 lock.lock(); // 加鎖以后10秒鐘自動(dòng)解鎖 lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動(dòng)解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { //解鎖 lock.unlock(); } }
四、原理分析
1.程序啟動(dòng)創(chuàng)建RedissonClient時(shí)做了啥?
//創(chuàng)建客戶端 RedissonClient redisson = Redisson.create(config)
根據(jù)配置的config信息創(chuàng)建RedissonClient客戶端,創(chuàng)建連接redis的管理器、執(zhí)行redis命令的執(zhí)行器,并生成一個(gè)uuid值作為此客戶端的id,此id將會(huì)貫穿程序的一生,后面加鎖時(shí)需要使用此客戶端id+進(jìn)程號(hào)作為鎖hash值的key。此處創(chuàng)建的執(zhí)行器,會(huì)在后面創(chuàng)建鎖時(shí)復(fù)用。
protected Redisson(Config config) { this.config = config; //復(fù)制一份配置信息 Config configCopy = new Config(config); //根據(jù)配置信息連接redis的方式創(chuàng)建連接管理器,分為單節(jié)點(diǎn)、集群、哨兵模式等 //此處會(huì)出創(chuàng)建UUID id = UUID.randomUUID()作為客戶端的唯一id this.connectionManager = ConfigSupport.createConnectionManager(configCopy); RedissonObjectBuilder objectBuilder = null; if (config.isReferenceEnabled()) { objectBuilder = new RedissonObjectBuilder(this); } //創(chuàng)建操作redis的執(zhí)行器 this.commandExecutor = new CommandSyncService(this.connectionManager, objectBuilder); this.evictionScheduler = new EvictionScheduler(this.commandExecutor); this.writeBehindService = new WriteBehindService(this.commandExecutor); }
2.創(chuàng)建鎖的時(shí)候做了啥?
//創(chuàng)建鎖 RLock lock = redissonClient.getLock("mylock");
根據(jù)一個(gè)key值,進(jìn)行鎖的創(chuàng)建,最終的創(chuàng)建會(huì)在Redisson類中實(shí)現(xiàn),因?yàn)镽edisson實(shí)現(xiàn)了RedissonClient接口。
public RLock getLock(String name) { //this.commandExecutor執(zhí)行器為程序啟動(dòng)時(shí),創(chuàng)建RedissonClient客戶端時(shí)已經(jīng)生成 return new RedissonLock(this.commandExecutor, name); }
創(chuàng)建RedissonLock的時(shí)候,會(huì)連帶創(chuàng)建它的父類RedissonBaseLock、RedissonExpirable、RedissonObject,并把執(zhí)行器和鎖key一并傳遞過去,供后面程序調(diào)用使用。
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { //創(chuàng)建父類RedissonBaseLock super(commandExecutor, name); //程序啟動(dòng)時(shí)創(chuàng)建的執(zhí)行器復(fù)制一份給此類變量 this.commandExecutor = commandExecutor; //WatchDog 鎖續(xù)期的時(shí)間,默認(rèn)是30秒 this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }
RedissonLock類的父級(jí)關(guān)系:
3.嘗試獲取鎖都做了啥?
//嘗試獲取鎖 boolean b = lock.tryLock(30, TimeUnit.SECONDS)
首先會(huì)調(diào)用到j(luò)dk包java.util.concurrent.locks下的嘗試獲取鎖方法:
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
由于繼承和實(shí)現(xiàn)接口,最終調(diào)用到RedissonLock的tryLock方法:
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { //waitTime為獲取鎖等待的時(shí)間,超過此時(shí)間獲取不到鎖則獲取鎖失敗,-1L表示沒有設(shè)置加鎖時(shí)間,默認(rèn)的加鎖30秒,為后續(xù)判斷是否加延期watch dog做標(biāo)識(shí),unit為時(shí)間單位 return this.tryLock(waitTime, -1L, unit); }
具體看下獲取鎖的方法:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { //獲取鎖等待時(shí)間 long time = unit.toMillis(waitTime); //當(dāng)前時(shí)間,用于后面計(jì)算使用 long current = System.currentTimeMillis(); //當(dāng)前線程的id,用于后面加鎖、訂閱信息等使用 long threadId = Thread.currentThread().getId(); //嘗試獲取鎖,若是此key已經(jīng)加鎖,且不是當(dāng)前線程加的鎖,則返回此鎖還有多久過期,若是返回的是null則標(biāo)識(shí)加鎖成功 Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { //加鎖成功,直接返回獲取鎖成功 return true; } else { //計(jì)算鎖等待時(shí)間 time -= System.currentTimeMillis() - current; if (time <= 0L) { //鎖等待時(shí)間小于0,則加鎖失敗,直接返回false this.acquireFailed(waitTime, unit, threadId); return false; } else { //再次獲取當(dāng)前時(shí)間 current = System.currentTimeMillis(); //根據(jù)線程id,訂閱鎖釋放事件,添加監(jiān)聽,當(dāng)鎖釋放了,通知等待的線程爭(zhēng)搶鎖資源 RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId); //當(dāng)await返回的為false,表示等待時(shí)間已經(jīng)超出獲取鎖最大等待時(shí)間,取消訂閱并返回獲取鎖失敗 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { this.unsubscribe(subscribeFuture, threadId); } }); } this.acquireFailed(waitTime, unit, threadId); return false; } else { try { //重新計(jì)算剩余等待時(shí)間 time -= System.currentTimeMillis() - current; //等待時(shí)間為負(fù)數(shù),直接返回false if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); boolean var20 = false; return var20; } else { boolean var16; do { long currentTime = System.currentTimeMillis(); //再次嘗試獲取鎖 ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); //獲取鎖成功直接返回 if (ttl == null) { var16 = true; return var16; } //計(jì)算剩余等待時(shí)間 time -= System.currentTimeMillis() - currentTime; if (time <= 0L) { //獲取鎖失敗 this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } currentTime = System.currentTimeMillis(); //當(dāng)鎖釋放的時(shí)間ttl小于等待獲取鎖的時(shí)間time,則讓線程掛起ttl的時(shí)間再進(jìn)行鎖的獲取,避免了無效的鎖申請(qǐng)浪費(fèi)資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間 if (ttl >= 0L && ttl < time) { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { //當(dāng)?shù)却@取鎖的時(shí)間time小于鎖釋放的時(shí)間ttl,則讓線程掛起time的時(shí)間,再去獲取鎖,避免了無效的鎖申請(qǐng)浪費(fèi)資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間,此時(shí)肯定是獲取鎖失敗,因?yàn)殒i還沒到釋放的時(shí)間 ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } //經(jīng)過時(shí)間的消耗,重新計(jì)算鎖的等待時(shí)間 time -= System.currentTimeMillis() - currentTime; } while(time > 0L); //執(zhí)行循環(huán)獲取鎖的條件是:獲取鎖的剩余時(shí)間還大于0,否則跳出循環(huán),執(zhí)行后面的獲取鎖失敗程序 this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } } finally { //不管是否獲取到鎖,都需要取消訂閱鎖釋放事件 this.unsubscribe(subscribeFuture, threadId); } } } } }
首先根據(jù)進(jìn)程id獲取鎖,若是ttl返回的為null則表示獲取鎖成功,直接返回true;若是獲取到的ttl是一個(gè)時(shí)間值,則表示此鎖被其他線程占用,此值表示鎖過期的時(shí)間,則進(jìn)行后續(xù)的鎖釋放訂閱事件,通過redis的channel信道,異步信號(hào)量來監(jiān)聽鎖釋放機(jī)制,當(dāng)鎖釋放,繼續(xù)嘗試獲取鎖;每一步操作花費(fèi)一定的時(shí)候后,都需要根據(jù)當(dāng)前時(shí)間減去進(jìn)行操作前的時(shí)間,結(jié)果值與獲取鎖等待的剩余時(shí)間做計(jì)算,若是等待時(shí)間小于0,則直接返回獲取鎖失敗。為了減少do while里面重復(fù)無效獲取鎖浪費(fèi)資源,使用了LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間,直到獲取鎖等待時(shí)間小于0則退出while循環(huán)。
嘗試獲取鎖的方法this.tryAcquire執(zhí)行的redis語句是一個(gè)lua腳本,之所以使用lua腳本是為了保證執(zhí)行的原子性,如下:
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
KEYS[1]代表加鎖的key上面的"mylock",ARGV[1]代表鎖的生存時(shí)間,默認(rèn)是30秒,ARGV[2]代表加鎖的hash值的key,由客戶端id+當(dāng)前線程id組成,客戶端id是程序啟動(dòng)創(chuàng)建RedissonClient客戶端是生成的uuid。
上面redis含義為:
redis.call('exists', KEYS[1]) == 0:當(dāng)前key的鎖不存在 redis.call('hincrby', KEYS[1], ARGV[2], 1):則進(jìn)行加鎖,加鎖次數(shù)加1,類似于redis執(zhí)行HINCRBY myLock we65768xs-6752-4c23-278a-67ee2f1986jhf:43 1 redis.call('pexpire', KEYS[1], ARGV[1]):設(shè)置鎖過期時(shí)間,類似于pexpire myLock 30000 redis.call('hexists', KEYS[1], ARGV[2]) == 1:key已經(jīng)被當(dāng)前客戶端當(dāng)前線程加鎖了 redis.call('hincrby', KEYS[1], ARGV[2], 1):重新加鎖,加鎖次數(shù)加1 redis.call('pexpire', KEYS[1], ARGV[1]):設(shè)置加鎖時(shí)間,類似于pexpire myLock 30000 return redis.call('pttl', KEYS[1]):上面兩個(gè)if條件都不滿足,表示此key已經(jīng)加鎖了,且不是此線程加的鎖,返回此鎖還有多久過期
語句連貫解釋:(1)當(dāng)此key鎖不存在,則進(jìn)行加鎖,存儲(chǔ)類型為hash,hash值的key為客戶端id+線程id,value為1,設(shè)置鎖的過期時(shí)間,默認(rèn)是30秒,語句結(jié)束返回(2)1不滿足,判斷是否為此客戶端的此線程加的鎖,若是,則加鎖次數(shù)value值加1,重新賦值鎖的過期時(shí)間,語句結(jié)束返回(3)前面兩個(gè)都不滿足,說明此key的鎖已經(jīng)被其他客戶端或者相同客戶端不同線程加上了,此時(shí)查詢此鎖的過期時(shí)間返回。
加鎖key的結(jié)構(gòu)說明:
同一客戶端+同一線程多次去獲取鎖,獲取到的話值value加1,redisson是可重入鎖,下面這樣重復(fù)加鎖的方式是允許的,幾次加鎖,需要使用幾個(gè)unlock解鎖。
//加鎖 lock.lock(); //加鎖 lock.lock(); //釋放鎖 lock.unlock(); lock.unlock();
測(cè)試重復(fù)加鎖可以使用debug得到當(dāng)前客戶端id+進(jìn)程號(hào)作為鎖hash值key,value給1,手動(dòng)向redis中添加一行記錄,再執(zhí)行嘗試獲取鎖,此時(shí)redis中hash值value會(huì)加1變?yōu)?。
4.獲取不到鎖,訂閱鎖釋放機(jī)制是如何實(shí)現(xiàn)的?
? 當(dāng)獲取不到鎖,返回的ttl為鎖的過期時(shí)間時(shí),往下繼續(xù)執(zhí)行獲取鎖的實(shí)現(xiàn),此時(shí)會(huì)執(zhí)行redis的訂閱鎖機(jī)制。
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
具體訂閱機(jī)制實(shí)現(xiàn):
//參數(shù)entryName為創(chuàng)建鎖時(shí)創(chuàng)建RedissonLock的時(shí)候,連帶創(chuàng)建它的父類RedissonBaseLock,值為客戶端id加key值:this.entryName = this.id + ":" + name; //參數(shù)channelName信道名稱為創(chuàng)建鎖時(shí)創(chuàng)建RedissonLock的時(shí)候,連帶創(chuàng)建它的父類RedissonObject,值為固定字符加鎖key:redisson_lock__channel+":{" + name + "}" public RFuture<E> subscribe(String entryName, String channelName) { //根據(jù)信道名稱創(chuàng)建信道,再獲取異步信號(hào)量 AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName)); //創(chuàng)建redisson信號(hào)量 RPromise<E> newPromise = new RedissonPromise(); //semaphore.acquire:Lambda表達(dá)式,把當(dāng)前創(chuàng)建的信號(hào)量添加到listeners線程集合中進(jìn)行監(jiān)聽 semaphore.acquire(() -> { //redisson信號(hào)量取消 if (!newPromise.setUncancellable()) { //釋放信號(hào)量 semaphore.release(); } else { //從private final ConcurrentMap<String, E> entries = new ConcurrentHashMap();中獲取是否有此entry的信號(hào)量,此處使用ConcurrentHashMap是為了線程安全,并且提高效率,因?yàn)镃oncurrentHashMap是線程安全的分段鎖 E entry = (PubSubEntry)this.entries.get(entryName); //存在此信號(hào)量,則釋放信號(hào)量 if (entry != null) { entry.acquire(); semaphore.release(); entry.getPromise().onComplete(new TransferListener(newPromise)); } else { //不存在此entry,則創(chuàng)建 E value = this.createEntry(newPromise); value.acquire(); //此處使用ConcurrentHashMap的putIfAbsent校驗(yàn)是否已經(jīng)存在此entry,存在則不添加 E oldValue = (PubSubEntry)this.entries.putIfAbsent(entryName, value); if (oldValue != null) { //存在則不添加,釋放信號(hào)量 oldValue.acquire(); semaphore.release(); oldValue.getPromise().onComplete(new TransferListener(newPromise)); } else { //不存在此entry的信號(hào)量 //創(chuàng)建一個(gè)信道的監(jiān)聽 RedisPubSubListener<Object> listener = this.createListener(channelName, value); //訂閱信道的監(jiān)聽事件,當(dāng)鎖釋放時(shí),信號(hào)量的release會(huì)被調(diào)用 this.service.subscribe(LongCodec.INSTANCE, channelName, semaphore, new RedisPubSubListener[]{listener}); } } } }); return newPromise; }
通過redis的channel信道訂閱鎖釋放的事件,創(chuàng)建異步信號(hào)量AsyncSemaphore監(jiān)聽鎖釋放的機(jī)制,當(dāng)鎖釋放時(shí)調(diào)用信號(hào)量的release釋放方法,此時(shí)被信號(hào)量阻塞的線程就可以繼續(xù)嘗試獲取鎖,釋放鎖的方法如下:
//定義一個(gè)AtomicInteger類型的counter變量,記錄線程數(shù),AtomicInteger能保證在多線程下的安全性,其特性是加和減的時(shí)候先用當(dāng)前需要變化后的值和舊的值進(jìn)行比較,例如當(dāng)前需要加1,則用加后的結(jié)果值減去1,再和舊的值比較,一致了才進(jìn)行覆蓋的操作,保證多線程下的安全性 private final AtomicInteger counter; //嘗試執(zhí)行線程 private void tryRun() { if (this.counter.decrementAndGet() >= 0) { Runnable listener = (Runnable)this.listeners.poll(); if (listener == null) { this.counter.incrementAndGet(); return; } listener.run(); } else if (this.counter.incrementAndGet() > 0) { this.tryRun(); } } //釋放信號(hào)量的方法 public void release() { //信號(hào)量加1 this.counter.incrementAndGet(); //啟動(dòng)監(jiān)聽 this.tryRun(); }
5.避免無效申請(qǐng)鎖浪費(fèi)資源是怎么實(shí)現(xiàn)的?
//當(dāng)鎖釋放的時(shí)間ttl小于等待獲取鎖的時(shí)間time,則讓線程掛起ttl的時(shí)間再進(jìn)行鎖的獲取,避免了無效的鎖申請(qǐng)浪費(fèi)資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間 if (ttl >= 0L && ttl < time) { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { //當(dāng)?shù)却@取鎖的時(shí)間time小于鎖釋放的時(shí)間ttl,則讓線程掛起time的時(shí)間,再去獲取鎖,避免了無效的鎖申請(qǐng)浪費(fèi)資源,,使用LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間,此時(shí)肯定是獲取鎖失敗,因?yàn)殒i還沒到釋放的時(shí)間 ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); }
ttl是鎖釋放的時(shí)間,time是獲取鎖剩余的等待時(shí)間,此方法是放在do while里面的,為了不讓程序無效申請(qǐng)鎖浪費(fèi)資源,在此處做的優(yōu)化。讓線程掛起一定的時(shí)間后再執(zhí)行獲取鎖,掛起的時(shí)間根據(jù)ttl和time的大小來定,若是滿足if條件,代表獲取鎖等待的時(shí)間比鎖釋放的時(shí)間要長(zhǎng),則讓程序掛起ttl的時(shí)間,這樣鎖已經(jīng)釋放了,再去獲取;若是不滿足if條件,代表釋放鎖的時(shí)間比鎖等待的時(shí)間要長(zhǎng),則讓程序掛起time的時(shí)間,此時(shí)鎖還沒有釋放,但是獲取鎖的等待時(shí)間已經(jīng)到達(dá),繼續(xù)執(zhí)行while循環(huán),此時(shí)會(huì)跳出while,表示獲取鎖失敗。
LockSupport類掛起線程的方法:
public static void parkNanos(Object blocker, long nanos) { //掛起時(shí)間大于0 if (nanos > 0) { //獲取當(dāng)前線程 Thread t = Thread.currentThread(); //設(shè)置掛起的線程 setBlocker(t, blocker); //public native void park(boolean isAbsolute, long time);第一個(gè)參數(shù)是是否是絕對(duì)時(shí)間,第二個(gè)參數(shù)是等待時(shí)間值 UNSAFE.park(false, nanos); //移除掛起的線程 setBlocker(t, null); } } //設(shè)置掛起的線程 private static void setBlocker(Thread t, Object arg) { UNSAFE.putObject(t, parkBlockerOffset, arg); }
6.當(dāng)加鎖時(shí)間內(nèi)處理不完業(yè)務(wù),鎖續(xù)時(shí)是怎么處理的?
? 當(dāng)在加鎖時(shí)間范圍內(nèi),處理不完業(yè)務(wù),需要更新此鎖的過期時(shí)間,此處就需要redisson的一個(gè)watch dog機(jī)制進(jìn)行處理。注意watch dog機(jī)制只適用于鎖過期時(shí)間為默認(rèn)30秒的方式,自己配置的過期時(shí)間,盡管是配的30秒過期也不會(huì)啟用watch dog機(jī)制。
//嘗試獲取鎖,等待獲取時(shí)間30秒,此種方式會(huì)啟用watch dog boolean b = lock.tryLock(30, TimeUnit.SECONDS); //嘗試獲取鎖,等待獲取時(shí)間20秒,鎖的過期時(shí)間是30秒或者配置為其他任意值,都不會(huì)啟用watch dog boolean b = lock.tryLock(20, 30,TimeUnit.SECONDS);
源碼中當(dāng)沒有配置過期時(shí)間,默認(rèn)傳遞一個(gè)-1L,此-1L作為后面判斷是否要?jiǎng)?chuàng)建定時(shí)任務(wù)啟動(dòng)watch dog的標(biāo)識(shí):
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { return this.tryLock(waitTime, -1L, unit); }
若是配置了鎖的過期時(shí)間,盡管是配的30秒,也不會(huì)啟動(dòng)watch dog,源碼中是直接把過期時(shí)間傳遞過去的,沒有做30秒轉(zhuǎn)成-1L的操作:
boolean tryLock(long var1, long var3, TimeUnit var5) throws InterruptedException;
源碼中獲取鎖,判斷是否啟用定時(shí)器的邏輯代碼:
//waitTime:獲取鎖等待的時(shí)間 //leaseTime:加鎖時(shí)間,沒有配置的情況傳遞的是-1L,有配置過期時(shí)間則直接傳遞的是過期時(shí)間 //unit:時(shí)間單位 //threadId:當(dāng)前線程號(hào) private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture ttlRemainingFuture; if (leaseTime != -1L) { //不等于-1L表示用戶自己配置了過期時(shí)間,加鎖時(shí)傳遞用戶配置的過期時(shí)間 ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //沒有配置鎖的過期時(shí)間,使用默認(rèn)的時(shí)間;internalLockLeaseTime為程序啟動(dòng)時(shí)創(chuàng)建RedissonClient客戶端時(shí)設(shè)置的默認(rèn)值30秒,在創(chuàng)建鎖時(shí),創(chuàng)建RedissonLock時(shí)已經(jīng)給此值賦上值 ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { //e==null表示獲取到鎖,獲取不到鎖會(huì)有鎖過期的時(shí)間 if (e == null) { //ttlRemaining == null表示獲取到鎖 if (ttlRemaining == null) { if (leaseTime != -1L) { //當(dāng)用戶配置了過期時(shí)間,則把過期時(shí)間賦值給internalLockLeaseTime變量,記錄鎖的過期時(shí)間 this.internalLockLeaseTime = unit.toMillis(leaseTime); } else { //使用默認(rèn)的30秒過期時(shí)間,則創(chuàng)建定時(shí)器啟動(dòng)watch dog續(xù)時(shí) this.scheduleExpirationRenewal(threadId); } } } }); return ttlRemainingFuture; }
當(dāng)用戶配置了過期時(shí)間(leaseTime != -1L)時(shí),獲取到鎖,在redis中存放的過期時(shí)間為用戶配置的時(shí)間;當(dāng)使用默認(rèn)的過期時(shí)間30秒,則在redis中存放的過期時(shí)間為程序啟動(dòng)時(shí)默認(rèn)配置的30秒。當(dāng)獲取鎖的結(jié)果為null即成功時(shí),進(jìn)行判斷是否要啟動(dòng)watch dog 續(xù)時(shí)機(jī)制,若是用戶自己配置的過期時(shí)間,則給類中記錄此鎖過期的變量賦值上用戶設(shè)置的數(shù)據(jù),若是默認(rèn)30秒過期時(shí)間,則添加定時(shí)器啟動(dòng)watch dog。
看下設(shè)置定時(shí)器的源碼:
//根據(jù)進(jìn)程id設(shè)置定時(shí)器啟動(dòng)watch dog protected void scheduleExpirationRenewal(long threadId) { //創(chuàng)建一個(gè)entry對(duì)象 RedissonBaseLock.ExpirationEntry entry = new RedissonBaseLock.ExpirationEntry(); //從ConcurrentHashMap類型的EXPIRATION_RENEWAL_MAP變量中判斷此entry是否已經(jīng)存在 RedissonBaseLock.ExpirationEntry oldEntry = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { //已經(jīng)存在,更新進(jìn)程號(hào) oldEntry.addThreadId(threadId); } else { //不存在則添加,值引用直接添加進(jìn)程號(hào) entry.addThreadId(threadId); try { //檢查表達(dá)式 this.renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { //進(jìn)程中斷了,EXPIRATION_RENEWAL_MAP中移除entry,停止定時(shí)器 this.cancelExpirationRenewal(threadId); } } } }
根據(jù)進(jìn)程號(hào),從全局變量EXPIRATION_RENEWAL_MAP中看是否有此進(jìn)程的entry存在,存在則更新進(jìn)程號(hào),不存在則添加進(jìn)去;當(dāng)線程終止了,需要從EXPIRATION_RENEWAL_MAP中移除entry,停止定時(shí)器。添加完成后開始檢驗(yàn)鎖的過期時(shí)間,源碼為:
//檢查表達(dá)式 private void renewExpiration() { //從全局變量中查詢線程的entry對(duì)象 RedissonBaseLock.ExpirationEntry ee = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { //entry對(duì)象存在 //創(chuàng)建一個(gè)定時(shí)器,定時(shí)器執(zhí)行的時(shí)間this.internalLockLeaseTime / 3L,10秒鐘執(zhí)行一次 Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { //從全局變量中查詢線程的entry對(duì)象 RedissonBaseLock.ExpirationEntry ent = (RedissonBaseLock.ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName()); if (ent != null) {//entry對(duì)象存在 //獲取線程號(hào) Long threadId = ent.getFirstThreadId(); if (threadId != null) { //異步執(zhí)行判斷是否還持有鎖,持有鎖的話,再把鎖的過期時(shí)間更新為30秒,也是一個(gè)lua執(zhí)行腳本 RFuture<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { //執(zhí)行更新鎖過期時(shí)間失敗 RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e); //從全局變量中移除entry RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName()); } else { if (res) { //更新鎖為30秒過期成功,則重新調(diào)用renewExpiration方法,再次添加定時(shí)器檢查 RedissonBaseLock.this.renewExpiration(); } else {//已經(jīng)不存在此鎖,任務(wù)已經(jīng)完成,則EXPIRATION_RENEWAL_MAP中移除entry,停止定時(shí)器 RedissonBaseLock.this.cancelExpirationRenewal((Long)null); } } }); } } } //設(shè)置task的定時(shí)時(shí)間,指定時(shí)間單位 }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); //添加到定時(shí)器中 ee.setTimeout(task); } }
從全局變量EXPIRATION_RENEWAL_MAP中獲取線程的entry,當(dāng)entry存在,則執(zhí)行一段檢查鎖是否存在,存在則更新過期時(shí)間為30秒的lua腳本,當(dāng)lua腳本的執(zhí)行結(jié)果返回成功,則重新調(diào)用renewExpiration方法,重新添加定時(shí)器任務(wù);當(dāng)lua腳本執(zhí)行失敗,則從EXPIRATION_RENEWAL_MAP中移除entry,停止定時(shí)器。
檢查鎖key是否存在,并更新鎖過期時(shí)間的方法renewExpirationAsync中l(wèi)ua腳本:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
redis腳本含義:
redis.call('hexists', KEYS[1], ARGV[2]) == 1:存在此key的鎖,并且是當(dāng)前客戶端下當(dāng)前線程所擁有 redis.call('pexpire', KEYS[1], ARGV[1]):重新給此key設(shè)置過期時(shí)間,更新為30秒
每次執(zhí)行這段lua腳本滿足if條件,并且執(zhí)行成功,則此key的過期時(shí)間被重置為30秒,業(yè)務(wù)一直沒有處理完的話,會(huì)每隔十秒過期時(shí)間被重置為30秒。
五、方案優(yōu)缺點(diǎn)
1.優(yōu)點(diǎn)
(1)通過watch dog機(jī)制實(shí)現(xiàn)了鎖的續(xù)期問題。
(2)結(jié)合著redis一塊使用,系統(tǒng)性能更高。
(3)操作redis使用lua腳本,保證執(zhí)行的原子性。
(4)支持可重入鎖。
(5)使用了LockSupport的Unsafe.park使線程掛起,避免了重復(fù)無效獲取鎖浪費(fèi)資源。
2.缺點(diǎn)
(1)在redis主從模式或者集群模式下,當(dāng)客戶端1在master節(jié)點(diǎn)加鎖成功,但是master節(jié)點(diǎn)還沒有異步復(fù)制數(shù)據(jù)給其他slave節(jié)點(diǎn)時(shí),master節(jié)點(diǎn)宕機(jī)了,此時(shí)客戶端2來申請(qǐng)加鎖,會(huì)在新的master節(jié)點(diǎn)上加鎖成功,此時(shí)會(huì)存在多個(gè)客戶端加鎖成功的情況,可能會(huì)產(chǎn)生不必要的臟數(shù)據(jù)。
(2)watch dog 機(jī)制10秒觸發(fā)一次會(huì)消耗一定的服務(wù)器資源。
到此這篇關(guān)于源碼詳解分布式鎖redisson實(shí)現(xiàn)原理的文章就介紹到這了,更多相關(guān)分布式鎖redisson實(shí)現(xiàn)原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
@RequestMapping對(duì)不同參數(shù)的接收方式示例詳解
Spring?MVC框架中,@RequestMapping注解用于映射URL到控制器方法,不同的參數(shù)類型如簡(jiǎn)單參數(shù)、實(shí)體參數(shù)、數(shù)組參數(shù)、集合參數(shù)、日期參數(shù)和JSON參數(shù),本文給大家介紹@RequestMapping對(duì)不同參數(shù)的接收方式,感興趣的朋友一起看看吧2024-10-10Java gRPC攔截器簡(jiǎn)單實(shí)現(xiàn)分布式日志鏈路追蹤器過程詳解
有請(qǐng)求的發(fā)送、處理,當(dāng)然就會(huì)有攔截器的需求,例如在服務(wù)端通過攔截器統(tǒng)一進(jìn)行請(qǐng)求認(rèn)證等操作,這些就需要攔截器來完成,今天松哥先和小伙伴們來聊一聊gRPC中攔截器的基本用法,后面我再整一篇文章和小伙伴們做一個(gè)基于攔截器實(shí)現(xiàn)的JWT認(rèn)證的gRPC2023-03-03FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式)
這篇文章主要介紹了FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03Java畢業(yè)設(shè)計(jì)實(shí)戰(zhàn)之財(cái)務(wù)預(yù)算管理系統(tǒng)的實(shí)現(xiàn)
這是一個(gè)使用了java+SSM+Jsp+Mysql+Layui+Maven開發(fā)的財(cái)務(wù)預(yù)算管理系統(tǒng),是一個(gè)畢業(yè)設(shè)計(jì)的實(shí)戰(zhàn)練習(xí),具有財(cái)務(wù)預(yù)算管理該有的所有功能,感興趣的朋友快來看看吧2022-02-02