欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

分布式鎖redisson實(shí)現(xiàn)原理源碼詳解

 更新時(shí)間:2023年05月22日 11:11:46   作者:清云青云  
這篇文章主要介紹了源碼詳解分布式鎖redisson實(shí)現(xiàn)原理,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

一、簡(jiǎn)介

? 現(xiàn)在項(xiàng)目一般都是使用分布式集群部署,對(duì)后臺(tái)業(yè)務(wù)數(shù)據(jù)的某些操作需要考慮加鎖的問題,而jdk的synchronize加鎖機(jī)制已經(jīng)不適合做集群部署的操作,因?yàn)閟ynchronize關(guān)鍵字只是針對(duì)于單體部署的單臺(tái)虛擬機(jī)有用。考慮到現(xiàn)在系統(tǒng)使用redis做緩存比較高效,此處推薦使用redis下的分布式鎖redisson進(jìn)行加鎖操作。官網(wǎng)參考:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

二、工程引入配置

1.工程中需要引入redis、redisson依賴,pom.xml中引入:

<!--redis依賴-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.3.2.RELEASE</version>
</dependency>
<!--redisson依賴-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.4</version>
</dependency>
<!--使用redis時(shí)需要此jar包-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

2.配置文件yml中添加redis連接信息。

spring:
  redis:
    database: 0
    host: xx.xx.xx.xx
    port: 1316
    password: xxxx
    timeout: 3000
    lettuce:
      pool:
        max-active: 20
        max-idle: 10
        max-wait: -1
        min-idle: 0

3.操作redis的客戶端選擇RedisTemplate,需要配置下存儲(chǔ)在redis的序列化值,使用@Bean注解,當(dāng)程序啟動(dòng)時(shí)加載到spring容器中供后期使用,redis的相關(guān)操作,不在此處進(jìn)行,有需求的可以參考筆者的這篇博文:http://www.dbjr.com.cn/article/220998.htm。

@Configuration
public class RedisConfig {
    Logger logger = LoggerFactory.getLogger(RedisConfig.class);
    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory) {
        logger.debug("redisTemplate實(shí)例化 {}");
        RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(factory);
        FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer<Object>(Object.class);
        // key的序列化采用StringRedisSerializer
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        // value值的序列化采用fastJsonRedisSerializer
        redisTemplate.setValueSerializer(fastJsonRedisSerializer);
        // hash的key也采用String的序列化方式
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // hash的value序列化方式采用fastJsonRedisSerializer
        redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

4.配置RedissonClient客戶端,用于加鎖操作,使用@Bean注解,當(dāng)程序啟動(dòng)時(shí)加載到spring容器中供后期使用,配置客戶端需要根據(jù)redis服務(wù)的模式配置,有集群、主從、哨兵等模式,具體配置參考官網(wǎng):https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95;此處使用的單節(jié)點(diǎn)模式配置。

@Configuration
public class RedissonConfig {
    //redis相關(guān)配置
    @Value("${spring.redis.host}")
    private String redisHost;
    @Value("${spring.redis.port}")
    private String redisPort;
    @Value("${spring.redis.database}")
    private int database;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.timeout}")
    private int timeout;
    //創(chuàng)建redisson客戶端,此時(shí)默認(rèn)使用單節(jié)點(diǎn)
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+redisHost+":"+redisPort);
        config.useSingleServer().setDatabase(database);
        config.useSingleServer().setPassword(password);
        config.useSingleServer().setTimeout(timeout);
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

三、加鎖操作

? 操作特別簡(jiǎn)單,通過RedissonClient獲取鎖,然后調(diào)用lock即可加鎖,解鎖使用unlock即可。

//在需要使用分布式鎖的類里面注入RedissonClient客戶端
@Autowired
RedissonClient redissonClient;
//根據(jù)鎖名稱獲取鎖
RLock lock = redissonClient.getLock("anyLock");
//加鎖
// 最常見的使用方法
lock.lock();
// 加鎖以后10秒鐘自動(dòng)解鎖
lock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動(dòng)解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       //解鎖
       lock.unlock();
   }
}

四、原理分析

1.程序啟動(dòng)創(chuàng)建RedissonClient時(shí)做了啥?

//創(chuàng)建客戶端
RedissonClient redisson = Redisson.create(config)

根據(jù)配置的config信息創(chuàng)建RedissonClient客戶端,創(chuàng)建連接redis的管理器、執(zhí)行redis命令的執(zhí)行器,并生成一個(gè)uuid值作為此客戶端的id,此id將會(huì)貫穿程序的一生,后面加鎖時(shí)需要使用此客戶端id+進(jìn)程號(hào)作為鎖hash值的key。此處創(chuàng)建的執(zhí)行器,會(huì)在后面創(chuàng)建鎖時(shí)復(fù)用。

protected Redisson(Config config) {
        this.config = config;
        //復(fù)制一份配置信息
        Config configCopy = new Config(config);
        //根據(jù)配置信息連接redis的方式創(chuàng)建連接管理器,分為單節(jié)點(diǎn)、集群、哨兵模式等
        //此處會(huì)出創(chuàng)建UUID id = UUID.randomUUID()作為客戶端的唯一id
        this.connectionManager = ConfigSupport.createConnectionManager(configCopy);
        RedissonObjectBuilder objectBuilder = null;
        if (config.isReferenceEnabled()) {
            objectBuilder = new RedissonObjectBuilder(this);
        }
        //創(chuàng)建操作redis的執(zhí)行器
        this.commandExecutor = new CommandSyncService(this.connectionManager, objectBuilder);
        this.evictionScheduler = new EvictionScheduler(this.commandExecutor);
        this.writeBehindService = new WriteBehindService(this.commandExecutor);
    }

2.創(chuàng)建鎖的時(shí)候做了啥?

//創(chuàng)建鎖
RLock lock = redissonClient.getLock("mylock");

根據(jù)一個(gè)key值,進(jìn)行鎖的創(chuàng)建,最終的創(chuàng)建會(huì)在Redisson類中實(shí)現(xiàn),因?yàn)镽edisson實(shí)現(xiàn)了RedissonClient接口。

    public RLock getLock(String name) {
        //this.commandExecutor執(zhí)行器為程序啟動(dòng)時(shí),創(chuàng)建RedissonClient客戶端時(shí)已經(jīng)生成
        return new RedissonLock(this.commandExecutor, name);
    }

創(chuàng)建RedissonLock的時(shí)候,會(huì)連帶創(chuàng)建它的父類RedissonBaseLock、RedissonExpirable、RedissonObject,并把執(zhí)行器和鎖key一并傳遞過去,供后面程序調(diào)用使用。

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        //創(chuàng)建父類RedissonBaseLock
        super(commandExecutor, name);
        //程序啟動(dòng)時(shí)創(chuàng)建的執(zhí)行器復(fù)制一份給此類變量
        this.commandExecutor = commandExecutor;
        //WatchDog 鎖續(xù)期的時(shí)間,默認(rèn)是30秒
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }

RedissonLock類的父級(jí)關(guān)系:

在這里插入圖片描述

3.嘗試獲取鎖都做了啥?

//嘗試獲取鎖
boolean b = lock.tryLock(30, TimeUnit.SECONDS)

首先會(huì)調(diào)用到j(luò)dk包java.util.concurrent.locks下的嘗試獲取鎖方法:

 boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

由于繼承和實(shí)現(xiàn)接口,最終調(diào)用到RedissonLock的tryLock方法:

public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    //waitTime為獲取鎖等待的時(shí)間,超過此時(shí)間獲取不到鎖則獲取鎖失敗,-1L表示沒有設(shè)置加鎖時(shí)間,默認(rèn)的加鎖30秒,為后續(xù)判斷是否加延期watch dog做標(biāo)識(shí),unit為時(shí)間單位
    return this.tryLock(waitTime, -1L, unit);
}

具體看下獲取鎖的方法:

 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        //獲取鎖等待時(shí)間
        long time = unit.toMillis(waitTime);
        //當(dāng)前時(shí)間,用于后面計(jì)算使用
        long current = System.currentTimeMillis();
        //當(dāng)前線程的id,用于后面加鎖、訂閱信息等使用
        long threadId = Thread.currentThread().getId();
        //嘗試獲取鎖,若是此key已經(jīng)加鎖,且不是當(dāng)前線程加的鎖,則返回此鎖還有多久過期,若是返回的是null則標(biāo)識(shí)加鎖成功
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
        if (ttl == null) {
            //加鎖成功,直接返回獲取鎖成功
            return true;
        } else {
            //計(jì)算鎖等待時(shí)間
            time -= System.currentTimeMillis() - current;
            if (time <= 0L) {  //鎖等待時(shí)間小于0,則加鎖失敗,直接返回false
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            } else {
                //再次獲取當(dāng)前時(shí)間
                current = System.currentTimeMillis();
                //根據(jù)線程id,訂閱鎖釋放事件,添加監(jiān)聽,當(dāng)鎖釋放了,通知等待的線程爭(zhēng)搶鎖資源
                RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                //當(dāng)await返回的為false,表示等待時(shí)間已經(jīng)超出獲取鎖最大等待時(shí)間,取消訂閱并返回獲取鎖失敗
                if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.onComplete((res, e) -> {
                            if (e == null) {
                                this.unsubscribe(subscribeFuture, threadId);
                            }
                        });
                    }
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                } else {
                    try {
                        //重新計(jì)算剩余等待時(shí)間
                        time -= System.currentTimeMillis() - current;
                        //等待時(shí)間為負(fù)數(shù),直接返回false
                        if (time <= 0L) {
                            this.acquireFailed(waitTime, unit, threadId);
                            boolean var20 = false;
                            return var20;
                        } else {
                            boolean var16;
                            do {
                                long currentTime = System.currentTimeMillis();
                                //再次嘗試獲取鎖
                                ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                                //獲取鎖成功直接返回
                                if (ttl == null) {
                                    var16 = true;
                                    return var16;
                                }
                                //計(jì)算剩余等待時(shí)間
                                time -= System.currentTimeMillis() - currentTime;
                                if (time <= 0L) { //獲取鎖失敗
                                    this.acquireFailed(waitTime, unit, threadId);
                                    var16 = false;
                                    return var16;
                                }
                                currentTime = System.currentTimeMillis();
                                //當(dāng)鎖釋放的時(shí)間ttl小于等待獲取鎖的時(shí)間time,則讓線程掛起ttl的時(shí)間再進(jìn)行鎖的獲取,避免了無效的鎖申請(qǐng)浪費(fèi)資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間
                                if (ttl >= 0L && ttl < time) {
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                } else {
                                //當(dāng)?shù)却@取鎖的時(shí)間time小于鎖釋放的時(shí)間ttl,則讓線程掛起time的時(shí)間,再去獲取鎖,避免了無效的鎖申請(qǐng)浪費(fèi)資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間,此時(shí)肯定是獲取鎖失敗,因?yàn)殒i還沒到釋放的時(shí)間
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                }
                                //經(jīng)過時(shí)間的消耗,重新計(jì)算鎖的等待時(shí)間
                                time -= System.currentTimeMillis() - currentTime;
                            } while(time > 0L);   //執(zhí)行循環(huán)獲取鎖的條件是:獲取鎖的剩余時(shí)間還大于0,否則跳出循環(huán),執(zhí)行后面的獲取鎖失敗程序
                            this.acquireFailed(waitTime, unit, threadId);
                            var16 = false;
                            return var16;
                        }
                    } finally {
                        //不管是否獲取到鎖,都需要取消訂閱鎖釋放事件
                        this.unsubscribe(subscribeFuture, threadId);
                    }
                }
            }
        }
    }

首先根據(jù)進(jìn)程id獲取鎖,若是ttl返回的為null則表示獲取鎖成功,直接返回true;若是獲取到的ttl是一個(gè)時(shí)間值,則表示此鎖被其他線程占用,此值表示鎖過期的時(shí)間,則進(jìn)行后續(xù)的鎖釋放訂閱事件,通過redis的channel信道,異步信號(hào)量來監(jiān)聽鎖釋放機(jī)制,當(dāng)鎖釋放,繼續(xù)嘗試獲取鎖;每一步操作花費(fèi)一定的時(shí)候后,都需要根據(jù)當(dāng)前時(shí)間減去進(jìn)行操作前的時(shí)間,結(jié)果值與獲取鎖等待的剩余時(shí)間做計(jì)算,若是等待時(shí)間小于0,則直接返回獲取鎖失敗。為了減少do while里面重復(fù)無效獲取鎖浪費(fèi)資源,使用了LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間,直到獲取鎖等待時(shí)間小于0則退出while循環(huán)。

嘗試獲取鎖的方法this.tryAcquire執(zhí)行的redis語句是一個(gè)lua腳本,之所以使用lua腳本是為了保證執(zhí)行的原子性,如下:

if (redis.call('exists', KEYS[1]) == 0) then 
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
return redis.call('pttl', KEYS[1]);

KEYS[1]代表加鎖的key上面的"mylock",ARGV[1]代表鎖的生存時(shí)間,默認(rèn)是30秒,ARGV[2]代表加鎖的hash值的key,由客戶端id+當(dāng)前線程id組成,客戶端id是程序啟動(dòng)創(chuàng)建RedissonClient客戶端是生成的uuid。

上面redis含義為:

redis.call('exists', KEYS[1]) == 0:當(dāng)前key的鎖不存在
redis.call('hincrby', KEYS[1], ARGV[2], 1):則進(jìn)行加鎖,加鎖次數(shù)加1,類似于redis執(zhí)行HINCRBY myLock we65768xs-6752-4c23-278a-67ee2f1986jhf:43 1
redis.call('pexpire', KEYS[1], ARGV[1]):設(shè)置鎖過期時(shí)間,類似于pexpire myLock 30000
redis.call('hexists', KEYS[1], ARGV[2]) == 1:key已經(jīng)被當(dāng)前客戶端當(dāng)前線程加鎖了
redis.call('hincrby', KEYS[1], ARGV[2], 1):重新加鎖,加鎖次數(shù)加1
redis.call('pexpire', KEYS[1], ARGV[1]):設(shè)置加鎖時(shí)間,類似于pexpire myLock 30000
return redis.call('pttl', KEYS[1]):上面兩個(gè)if條件都不滿足,表示此key已經(jīng)加鎖了,且不是此線程加的鎖,返回此鎖還有多久過期

語句連貫解釋:(1)當(dāng)此key鎖不存在,則進(jìn)行加鎖,存儲(chǔ)類型為hash,hash值的key為客戶端id+線程id,value為1,設(shè)置鎖的過期時(shí)間,默認(rèn)是30秒,語句結(jié)束返回(2)1不滿足,判斷是否為此客戶端的此線程加的鎖,若是,則加鎖次數(shù)value值加1,重新賦值鎖的過期時(shí)間,語句結(jié)束返回(3)前面兩個(gè)都不滿足,說明此key的鎖已經(jīng)被其他客戶端或者相同客戶端不同線程加上了,此時(shí)查詢此鎖的過期時(shí)間返回。

加鎖key的結(jié)構(gòu)說明:

在這里插入圖片描述

同一客戶端+同一線程多次去獲取鎖,獲取到的話值value加1,redisson是可重入鎖,下面這樣重復(fù)加鎖的方式是允許的,幾次加鎖,需要使用幾個(gè)unlock解鎖。

//加鎖
lock.lock();
//加鎖
lock.lock();
//釋放鎖
lock.unlock();
lock.unlock();

測(cè)試重復(fù)加鎖可以使用debug得到當(dāng)前客戶端id+進(jìn)程號(hào)作為鎖hash值key,value給1,手動(dòng)向redis中添加一行記錄,再執(zhí)行嘗試獲取鎖,此時(shí)redis中hash值value會(huì)加1變?yōu)?。

在這里插入圖片描述

4.獲取不到鎖,訂閱鎖釋放機(jī)制是如何實(shí)現(xiàn)的?

? 當(dāng)獲取不到鎖,返回的ttl為鎖的過期時(shí)間時(shí),往下繼續(xù)執(zhí)行獲取鎖的實(shí)現(xiàn),此時(shí)會(huì)執(zhí)行redis的訂閱鎖機(jī)制。

 RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

具體訂閱機(jī)制實(shí)現(xiàn):

//參數(shù)entryName為創(chuàng)建鎖時(shí)創(chuàng)建RedissonLock的時(shí)候,連帶創(chuàng)建它的父類RedissonBaseLock,值為客戶端id加key值:this.entryName = this.id + ":" + name;
//參數(shù)channelName信道名稱為創(chuàng)建鎖時(shí)創(chuàng)建RedissonLock的時(shí)候,連帶創(chuàng)建它的父類RedissonObject,值為固定字符加鎖key:redisson_lock__channel+":{" + name + "}"
public RFuture<E> subscribe(String entryName, String channelName) {
       //根據(jù)信道名稱創(chuàng)建信道,再獲取異步信號(hào)量 
        AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName));
       //創(chuàng)建redisson信號(hào)量
        RPromise<E> newPromise = new RedissonPromise();
       //semaphore.acquire:Lambda表達(dá)式,把當(dāng)前創(chuàng)建的信號(hào)量添加到listeners線程集合中進(jìn)行監(jiān)聽
        semaphore.acquire(() -> {
            //redisson信號(hào)量取消
            if (!newPromise.setUncancellable()) {
                //釋放信號(hào)量
                semaphore.release();
            } else {
                //從private final ConcurrentMap<String, E> entries = new ConcurrentHashMap();中獲取是否有此entry的信號(hào)量,此處使用ConcurrentHashMap是為了線程安全,并且提高效率,因?yàn)镃oncurrentHashMap是線程安全的分段鎖
                E entry = (PubSubEntry)this.entries.get(entryName);
                //存在此信號(hào)量,則釋放信號(hào)量
                if (entry != null) {
                    entry.acquire();
                    semaphore.release();
                    entry.getPromise().onComplete(new TransferListener(newPromise));
                } else {
                    //不存在此entry,則創(chuàng)建
                    E value = this.createEntry(newPromise);
                    value.acquire();
                    //此處使用ConcurrentHashMap的putIfAbsent校驗(yàn)是否已經(jīng)存在此entry,存在則不添加
                    E oldValue = (PubSubEntry)this.entries.putIfAbsent(entryName, value);
                    if (oldValue != null) { //存在則不添加,釋放信號(hào)量
                        oldValue.acquire();
                        semaphore.release();
                        oldValue.getPromise().onComplete(new TransferListener(newPromise));
                    } else {  //不存在此entry的信號(hào)量
                        //創(chuàng)建一個(gè)信道的監(jiān)聽
                        RedisPubSubListener<Object> listener = this.createListener(channelName, value);
                        //訂閱信道的監(jiān)聽事件,當(dāng)鎖釋放時(shí),信號(hào)量的release會(huì)被調(diào)用
                        this.service.subscribe(LongCodec.INSTANCE, channelName, semaphore, new RedisPubSubListener[]{listener});
                    }
                }
            }
        });
        return newPromise;
    }

通過redis的channel信道訂閱鎖釋放的事件,創(chuàng)建異步信號(hào)量AsyncSemaphore監(jiān)聽鎖釋放的機(jī)制,當(dāng)鎖釋放時(shí)調(diào)用信號(hào)量的release釋放方法,此時(shí)被信號(hào)量阻塞的線程就可以繼續(xù)嘗試獲取鎖,釋放鎖的方法如下:

//定義一個(gè)AtomicInteger類型的counter變量,記錄線程數(shù),AtomicInteger能保證在多線程下的安全性,其特性是加和減的時(shí)候先用當(dāng)前需要變化后的值和舊的值進(jìn)行比較,例如當(dāng)前需要加1,則用加后的結(jié)果值減去1,再和舊的值比較,一致了才進(jìn)行覆蓋的操作,保證多線程下的安全性
private final AtomicInteger counter;
//嘗試執(zhí)行線程
private void tryRun() {
        if (this.counter.decrementAndGet() >= 0) {
            Runnable listener = (Runnable)this.listeners.poll();
            if (listener == null) {
                this.counter.incrementAndGet();
                return;
            }
            listener.run();
        } else if (this.counter.incrementAndGet() > 0) {
            this.tryRun();
        }
    }
    //釋放信號(hào)量的方法
    public void release() {
        //信號(hào)量加1
        this.counter.incrementAndGet();
        //啟動(dòng)監(jiān)聽
        this.tryRun();
    }

5.避免無效申請(qǐng)鎖浪費(fèi)資源是怎么實(shí)現(xiàn)的?

  //當(dāng)鎖釋放的時(shí)間ttl小于等待獲取鎖的時(shí)間time,則讓線程掛起ttl的時(shí)間再進(jìn)行鎖的獲取,避免了無效的鎖申請(qǐng)浪費(fèi)資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間
                                if (ttl >= 0L && ttl < time) {
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                } else {
                                //當(dāng)?shù)却@取鎖的時(shí)間time小于鎖釋放的時(shí)間ttl,則讓線程掛起time的時(shí)間,再去獲取鎖,避免了無效的鎖申請(qǐng)浪費(fèi)資源,,使用LockSupport類的UNSAFE.park讓線程掛起一段時(shí)間,此時(shí)肯定是獲取鎖失敗,因?yàn)殒i還沒到釋放的時(shí)間
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                }

ttl是鎖釋放的時(shí)間,time是獲取鎖剩余的等待時(shí)間,此方法是放在do while里面的,為了不讓程序無效申請(qǐng)鎖浪費(fèi)資源,在此處做的優(yōu)化。讓線程掛起一定的時(shí)間后再執(zhí)行獲取鎖,掛起的時(shí)間根據(jù)ttl和time的大小來定,若是滿足if條件,代表獲取鎖等待的時(shí)間比鎖釋放的時(shí)間要長(zhǎng),則讓程序掛起ttl的時(shí)間,這樣鎖已經(jīng)釋放了,再去獲取;若是不滿足if條件,代表釋放鎖的時(shí)間比鎖等待的時(shí)間要長(zhǎng),則讓程序掛起time的時(shí)間,此時(shí)鎖還沒有釋放,但是獲取鎖的等待時(shí)間已經(jīng)到達(dá),繼續(xù)執(zhí)行while循環(huán),此時(shí)會(huì)跳出while,表示獲取鎖失敗。

LockSupport類掛起線程的方法:

 public static void parkNanos(Object blocker, long nanos) {
     //掛起時(shí)間大于0
     if (nanos > 0) {
         //獲取當(dāng)前線程
         Thread t = Thread.currentThread();
         //設(shè)置掛起的線程
         setBlocker(t, blocker);
         //public native void park(boolean isAbsolute, long time);第一個(gè)參數(shù)是是否是絕對(duì)時(shí)間,第二個(gè)參數(shù)是等待時(shí)間值
         UNSAFE.park(false, nanos);
         //移除掛起的線程
         setBlocker(t, null);
     }
 }
//設(shè)置掛起的線程
private static void setBlocker(Thread t, Object arg) {
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

6.當(dāng)加鎖時(shí)間內(nèi)處理不完業(yè)務(wù),鎖續(xù)時(shí)是怎么處理的?

? 當(dāng)在加鎖時(shí)間范圍內(nèi),處理不完業(yè)務(wù),需要更新此鎖的過期時(shí)間,此處就需要redisson的一個(gè)watch dog機(jī)制進(jìn)行處理。注意watch dog機(jī)制只適用于鎖過期時(shí)間為默認(rèn)30秒的方式,自己配置的過期時(shí)間,盡管是配的30秒過期也不會(huì)啟用watch dog機(jī)制。

 //嘗試獲取鎖,等待獲取時(shí)間30秒,此種方式會(huì)啟用watch dog
 boolean b = lock.tryLock(30, TimeUnit.SECONDS);
 //嘗試獲取鎖,等待獲取時(shí)間20秒,鎖的過期時(shí)間是30秒或者配置為其他任意值,都不會(huì)啟用watch dog
 boolean b = lock.tryLock(20, 30,TimeUnit.SECONDS);

源碼中當(dāng)沒有配置過期時(shí)間,默認(rèn)傳遞一個(gè)-1L,此-1L作為后面判斷是否要?jiǎng)?chuàng)建定時(shí)任務(wù)啟動(dòng)watch dog的標(biāo)識(shí):

    public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
        return this.tryLock(waitTime, -1L, unit);
    }

若是配置了鎖的過期時(shí)間,盡管是配的30秒,也不會(huì)啟動(dòng)watch dog,源碼中是直接把過期時(shí)間傳遞過去的,沒有做30秒轉(zhuǎn)成-1L的操作:

boolean tryLock(long var1, long var3, TimeUnit var5) throws InterruptedException;

源碼中獲取鎖,判斷是否啟用定時(shí)器的邏輯代碼:

//waitTime:獲取鎖等待的時(shí)間
//leaseTime:加鎖時(shí)間,沒有配置的情況傳遞的是-1L,有配置過期時(shí)間則直接傳遞的是過期時(shí)間
//unit:時(shí)間單位
//threadId:當(dāng)前線程號(hào)
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture ttlRemainingFuture;
        if (leaseTime != -1L) { //不等于-1L表示用戶自己配置了過期時(shí)間,加鎖時(shí)傳遞用戶配置的過期時(shí)間
            ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {  //沒有配置鎖的過期時(shí)間,使用默認(rèn)的時(shí)間;internalLockLeaseTime為程序啟動(dòng)時(shí)創(chuàng)建RedissonClient客戶端時(shí)設(shè)置的默認(rèn)值30秒,在創(chuàng)建鎖時(shí),創(chuàng)建RedissonLock時(shí)已經(jīng)給此值賦上值
            ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            //e==null表示獲取到鎖,獲取不到鎖會(huì)有鎖過期的時(shí)間
            if (e == null) {
                //ttlRemaining == null表示獲取到鎖
                if (ttlRemaining == null) {
                    if (leaseTime != -1L) { //當(dāng)用戶配置了過期時(shí)間,則把過期時(shí)間賦值給internalLockLeaseTime變量,記錄鎖的過期時(shí)間
                        this.internalLockLeaseTime = unit.toMillis(leaseTime);
                    } else {   //使用默認(rèn)的30秒過期時(shí)間,則創(chuàng)建定時(shí)器啟動(dòng)watch dog續(xù)時(shí)
                        this.scheduleExpirationRenewal(threadId);
                    }
                }
            }
        });
        return ttlRemainingFuture;
    }

當(dāng)用戶配置了過期時(shí)間(leaseTime != -1L)時(shí),獲取到鎖,在redis中存放的過期時(shí)間為用戶配置的時(shí)間;當(dāng)使用默認(rèn)的過期時(shí)間30秒,則在redis中存放的過期時(shí)間為程序啟動(dòng)時(shí)默認(rèn)配置的30秒。當(dāng)獲取鎖的結(jié)果為null即成功時(shí),進(jìn)行判斷是否要啟動(dòng)watch dog 續(xù)時(shí)機(jī)制,若是用戶自己配置的過期時(shí)間,則給類中記錄此鎖過期的變量賦值上用戶設(shè)置的數(shù)據(jù),若是默認(rèn)30秒過期時(shí)間,則添加定時(shí)器啟動(dòng)watch dog。

看下設(shè)置定時(shí)器的源碼:

//根據(jù)進(jìn)程id設(shè)置定時(shí)器啟動(dòng)watch dog
protected void scheduleExpirationRenewal(long threadId) {
        //創(chuàng)建一個(gè)entry對(duì)象
        RedissonBaseLock.ExpirationEntry entry = new RedissonBaseLock.ExpirationEntry();
        //從ConcurrentHashMap類型的EXPIRATION_RENEWAL_MAP變量中判斷此entry是否已經(jīng)存在
        RedissonBaseLock.ExpirationEntry oldEntry = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) { //已經(jīng)存在,更新進(jìn)程號(hào)
            oldEntry.addThreadId(threadId);
        } else {  //不存在則添加,值引用直接添加進(jìn)程號(hào)
            entry.addThreadId(threadId);
            try {
                //檢查表達(dá)式
                this.renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    //進(jìn)程中斷了,EXPIRATION_RENEWAL_MAP中移除entry,停止定時(shí)器
                    this.cancelExpirationRenewal(threadId);
                }
            }
        }
    }

根據(jù)進(jìn)程號(hào),從全局變量EXPIRATION_RENEWAL_MAP中看是否有此進(jìn)程的entry存在,存在則更新進(jìn)程號(hào),不存在則添加進(jìn)去;當(dāng)線程終止了,需要從EXPIRATION_RENEWAL_MAP中移除entry,停止定時(shí)器。添加完成后開始檢驗(yàn)鎖的過期時(shí)間,源碼為:

 //檢查表達(dá)式
private void renewExpiration() {
        //從全局變量中查詢線程的entry對(duì)象
        RedissonBaseLock.ExpirationEntry ee = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (ee != null) { //entry對(duì)象存在
            //創(chuàng)建一個(gè)定時(shí)器,定時(shí)器執(zhí)行的時(shí)間this.internalLockLeaseTime / 3L,10秒鐘執(zhí)行一次
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                      //從全局變量中查詢線程的entry對(duì)象
                    RedissonBaseLock.ExpirationEntry ent = (RedissonBaseLock.ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
                    if (ent != null) {//entry對(duì)象存在
                        //獲取線程號(hào)
                        Long threadId = ent.getFirstThreadId();
                        if (threadId != null) {
                            //異步執(zhí)行判斷是否還持有鎖,持有鎖的話,再把鎖的過期時(shí)間更新為30秒,也是一個(gè)lua執(zhí)行腳本
                            RFuture<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
                            future.onComplete((res, e) -> {
                                if (e != null) {  //執(zhí)行更新鎖過期時(shí)間失敗
                                    RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);
                                 //從全局變量中移除entry   RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
                                } else {
                                    if (res) {   //更新鎖為30秒過期成功,則重新調(diào)用renewExpiration方法,再次添加定時(shí)器檢查
                                        RedissonBaseLock.this.renewExpiration();
                                    } else {//已經(jīng)不存在此鎖,任務(wù)已經(jīng)完成,則EXPIRATION_RENEWAL_MAP中移除entry,停止定時(shí)器
                                        RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
                                    }
                                }
                            });
                        }
                    }
                }
                //設(shè)置task的定時(shí)時(shí)間,指定時(shí)間單位
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            //添加到定時(shí)器中
            ee.setTimeout(task);
        }
    }

從全局變量EXPIRATION_RENEWAL_MAP中獲取線程的entry,當(dāng)entry存在,則執(zhí)行一段檢查鎖是否存在,存在則更新過期時(shí)間為30秒的lua腳本,當(dāng)lua腳本的執(zhí)行結(jié)果返回成功,則重新調(diào)用renewExpiration方法,重新添加定時(shí)器任務(wù);當(dāng)lua腳本執(zhí)行失敗,則從EXPIRATION_RENEWAL_MAP中移除entry,停止定時(shí)器。

檢查鎖key是否存在,并更新鎖過期時(shí)間的方法renewExpirationAsync中l(wèi)ua腳本:

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
   redis.call('pexpire', KEYS[1], ARGV[1]); 
   return 1; 
end; 
return 0;

redis腳本含義:

redis.call('hexists', KEYS[1], ARGV[2]) == 1:存在此key的鎖,并且是當(dāng)前客戶端下當(dāng)前線程所擁有
redis.call('pexpire', KEYS[1], ARGV[1]):重新給此key設(shè)置過期時(shí)間,更新為30秒

每次執(zhí)行這段lua腳本滿足if條件,并且執(zhí)行成功,則此key的過期時(shí)間被重置為30秒,業(yè)務(wù)一直沒有處理完的話,會(huì)每隔十秒過期時(shí)間被重置為30秒。

五、方案優(yōu)缺點(diǎn)

1.優(yōu)點(diǎn)

(1)通過watch dog機(jī)制實(shí)現(xiàn)了鎖的續(xù)期問題。

(2)結(jié)合著redis一塊使用,系統(tǒng)性能更高。

(3)操作redis使用lua腳本,保證執(zhí)行的原子性。

(4)支持可重入鎖。

(5)使用了LockSupport的Unsafe.park使線程掛起,避免了重復(fù)無效獲取鎖浪費(fèi)資源。

2.缺點(diǎn)

(1)在redis主從模式或者集群模式下,當(dāng)客戶端1在master節(jié)點(diǎn)加鎖成功,但是master節(jié)點(diǎn)還沒有異步復(fù)制數(shù)據(jù)給其他slave節(jié)點(diǎn)時(shí),master節(jié)點(diǎn)宕機(jī)了,此時(shí)客戶端2來申請(qǐng)加鎖,會(huì)在新的master節(jié)點(diǎn)上加鎖成功,此時(shí)會(huì)存在多個(gè)客戶端加鎖成功的情況,可能會(huì)產(chǎn)生不必要的臟數(shù)據(jù)。

(2)watch dog 機(jī)制10秒觸發(fā)一次會(huì)消耗一定的服務(wù)器資源。

到此這篇關(guān)于源碼詳解分布式鎖redisson實(shí)現(xiàn)原理的文章就介紹到這了,更多相關(guān)分布式鎖redisson實(shí)現(xiàn)原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java命令行下Jar包打包小結(jié)

    Java命令行下Jar包打包小結(jié)

    這篇文章主要介紹了Java命令行下Jar包打包小結(jié),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-12-12
  • @RequestMapping對(duì)不同參數(shù)的接收方式示例詳解

    @RequestMapping對(duì)不同參數(shù)的接收方式示例詳解

    Spring?MVC框架中,@RequestMapping注解用于映射URL到控制器方法,不同的參數(shù)類型如簡(jiǎn)單參數(shù)、實(shí)體參數(shù)、數(shù)組參數(shù)、集合參數(shù)、日期參數(shù)和JSON參數(shù),本文給大家介紹@RequestMapping對(duì)不同參數(shù)的接收方式,感興趣的朋友一起看看吧
    2024-10-10
  • 線程池之exectue與submit的區(qū)別及說明

    線程池之exectue與submit的區(qū)別及說明

    這篇文章主要介紹了線程池之exectue與submit的區(qū)別及說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • Java gRPC攔截器簡(jiǎn)單實(shí)現(xiàn)分布式日志鏈路追蹤器過程詳解

    Java gRPC攔截器簡(jiǎn)單實(shí)現(xiàn)分布式日志鏈路追蹤器過程詳解

    有請(qǐng)求的發(fā)送、處理,當(dāng)然就會(huì)有攔截器的需求,例如在服務(wù)端通過攔截器統(tǒng)一進(jìn)行請(qǐng)求認(rèn)證等操作,這些就需要攔截器來完成,今天松哥先和小伙伴們來聊一聊gRPC中攔截器的基本用法,后面我再整一篇文章和小伙伴們做一個(gè)基于攔截器實(shí)現(xiàn)的JWT認(rèn)證的gRPC
    2023-03-03
  • FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式)

    FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式)

    這篇文章主要介紹了FeignClient實(shí)現(xiàn)接口調(diào)用方式(不同參數(shù)形式),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • 如何將Java打開CSV文件到JTable展示

    如何將Java打開CSV文件到JTable展示

    本文主要介紹了如何將Java打開CSV文件到JTable展示,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • @Query注解的原生用法和native用法解析

    @Query注解的原生用法和native用法解析

    這篇文章主要介紹了@Query注解的原生用法和native用法解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Java畢業(yè)設(shè)計(jì)實(shí)戰(zhàn)之財(cái)務(wù)預(yù)算管理系統(tǒng)的實(shí)現(xiàn)

    Java畢業(yè)設(shè)計(jì)實(shí)戰(zhàn)之財(cái)務(wù)預(yù)算管理系統(tǒng)的實(shí)現(xiàn)

    這是一個(gè)使用了java+SSM+Jsp+Mysql+Layui+Maven開發(fā)的財(cái)務(wù)預(yù)算管理系統(tǒng),是一個(gè)畢業(yè)設(shè)計(jì)的實(shí)戰(zhàn)練習(xí),具有財(cái)務(wù)預(yù)算管理該有的所有功能,感興趣的朋友快來看看吧
    2022-02-02
  • java中自定義線程池最佳實(shí)踐教程

    java中自定義線程池最佳實(shí)踐教程

    自定義線程池的最佳實(shí)踐包括:合理配置線程池大小、選擇合適的隊(duì)列類型、設(shè)置合理的拒絕策略、理解核心線程和非核心線程的區(qū)別、定期監(jiān)控和調(diào)優(yōu)、避免死鎖、使用合適的線程工廠、設(shè)計(jì)高效的任務(wù)、使用現(xiàn)有的線程池實(shí)現(xiàn)以及合理處理超時(shí)和中斷
    2025-03-03
  • Mybatis-plus如何更新Null字段詳解

    Mybatis-plus如何更新Null字段詳解

    MyBatis-plus在進(jìn)行更新操作時(shí)不會(huì)更新傳入實(shí)體中為null或默認(rèn)值屬性字段,只更新不為null的值、非默認(rèn)值的屬性字段,這篇文章主要給大家介紹了關(guān)于Mybatis-plus如何更新Null字段的相關(guān)資料,需要的朋友可以參考下
    2023-07-07

最新評(píng)論