MySql實(shí)現(xiàn)分布式鎖的示例代碼
本篇我們使用mysql實(shí)現(xiàn)一個(gè)分布式鎖。
環(huán)境:mysql8,navicat,maven,springboot2.3.11,mybatis-plus
分布式鎖的功能
1,分布式鎖使用者位于不同的機(jī)器中,鎖獲取成功之后,才可以對(duì)共享資源進(jìn)行操作
2,鎖具有重入的功能:即一個(gè)使用者可以多次獲取某個(gè)鎖
3,獲取鎖有超時(shí)的功能:即在指定的時(shí)間內(nèi)去嘗試獲取鎖,超過(guò)了超時(shí)時(shí)間,如果還未獲取成功,則返回獲取失敗
4,能夠自動(dòng)容錯(cuò),比如:A機(jī)器獲取鎖lock1之后,在釋放鎖lock1之前,A機(jī)器掛了,導(dǎo)致鎖lock1未釋放,結(jié)果會(huì)lock1一直被A機(jī)器占有著,遇到這種情況時(shí),分布式鎖要能夠自動(dòng)解決,可以這么做:持有鎖的時(shí)候可以加個(gè)持有超時(shí)時(shí)間,超過(guò)了這個(gè)時(shí)間還未釋放的,其他機(jī)器將有機(jī)會(huì)獲取鎖
預(yù)備技能:樂(lè)觀鎖
通常我們修改表中一條數(shù)據(jù)過(guò)程如下:
t1:select獲取記錄R1 t2:對(duì)R1進(jìn)行編輯 t3:update R1
我們來(lái)看一下上面的過(guò)程存在的問(wèn)題:
如果A、B兩個(gè)線程同時(shí)執(zhí)行到t1,他們倆看到的R1的數(shù)據(jù)一樣,然后都對(duì)R1進(jìn)行編輯,然后去執(zhí)行t3,最終2個(gè)線程都會(huì)更新成功,后面一個(gè)線程會(huì)把前面一個(gè)線程update的結(jié)果給覆蓋掉,這就是并發(fā)修改數(shù)據(jù)存在的問(wèn)題。
我們可以在表中新增一個(gè)版本號(hào),每次更新數(shù)據(jù)時(shí)候?qū)姹咎?hào)作為條件,并且每次更新時(shí)候版本號(hào)+1,過(guò)程優(yōu)化一下,如下:
t1:打開事務(wù)start transaction t2:select獲取記錄R1,聲明變量v=R1.version t3:對(duì)R1進(jìn)行編輯 t4:執(zhí)行更新操作 update R1 set version = version + 1 where user_id=#user_id# and version = #v#; t5:t4中的update會(huì)返回影響的行數(shù),我們將其記錄在count中,然后根據(jù)count來(lái)判斷提交還是回滾 if(count==1){ //提交事務(wù) commit; }else{ //回滾事務(wù) rollback; }
上面重點(diǎn)在于步驟t4,當(dāng)多個(gè)線程同時(shí)執(zhí)行到t1,他們看到的R1是一樣的,但是當(dāng)他們執(zhí)行到t4的時(shí)候,數(shù)據(jù)庫(kù)會(huì)對(duì)update的這行記錄加鎖,確保并發(fā)情況下排隊(duì)執(zhí)行,所以只有第一個(gè)的update會(huì)返回1,其他的update結(jié)果會(huì)返回0,然后后面會(huì)判斷count是否為1,進(jìn)而對(duì)事務(wù)進(jìn)行提交或者回滾??梢酝ㄟ^(guò)count的值知道修改數(shù)據(jù)是否成功了。
上面這種方式就樂(lè)觀鎖。我們可以通過(guò)樂(lè)觀鎖的方式確保數(shù)據(jù)并發(fā)修改過(guò)程中的正確性。
使用mysql實(shí)現(xiàn)分布式鎖
我們創(chuàng)建一個(gè)分布式鎖表,如下
DROP TABLE IF EXISTS t_lock; create table t_lock( lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖唯一標(biāo)志', request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來(lái)標(biāo)識(shí)請(qǐng)求對(duì)象的', lock_count INT NOT NULL DEFAULT 0 COMMENT '當(dāng)前上鎖次數(shù)', timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時(shí)時(shí)間', version INT NOT NULL DEFAULT 0 COMMENT '版本號(hào),每次更新+1' )COMMENT '鎖信息表';
java代碼如下
mapper接口
package com.shiguiwu.springmybatis.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.shiguiwu.springmybatis.lock.model.LockModel; import org.springframework.stereotype.Repository; /** * @description: 鎖mapper * @author: stone * @date: Created by 2021/5/30 11:12 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.mapper */ @Repository public interface LockMapper extends BaseMapper<LockModel> { }
鎖對(duì)象model
package com.shiguiwu.springmybatis.lock.model; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.Version; import lombok.Data; /** * @description: 鎖模型 * @author: stone * @date: Created by 2021/9/10 11:13 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock.model */ @Data @TableName("t_lock") public class LockModel { /** * 鎖的唯一值 */ @TableId private String lockKey; /** * 請(qǐng)求id,同一個(gè)線程里請(qǐng)求id一樣 */ private String requestId; //鎖次數(shù) private Integer lockCount; //鎖超時(shí) private Long timeout; //樂(lè)觀鎖版本 @Version private Integer version; }
鎖接口
package com.shiguiwu.springmybatis.lock; /** * @description: 鎖接口 * @author: stone * @date: Created by 2021/9/10 11:40 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock */ public interface ILock<T> { /** * 獲取分布式鎖,支持重入 * @param lockKey 鎖可以 * @param lockTimeout 持有鎖的有效時(shí)間,防止死鎖 * @param getTimeout 獲取鎖超時(shí)時(shí)間, * @return 是否鎖成功 */ public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception; /** * 解鎖 * @param lockKey 鎖key * */ public void unlock(String lockKey); /** * 重置鎖對(duì)象 * @param t 鎖對(duì)象 * @return 返回鎖記錄 */ public int restLock(T t); }
鎖的實(shí)現(xiàn)代碼如下
package com.shiguiwu.springmybatis.lock; import cn.hutool.core.util.StrUtil; import com.shiguiwu.springmybatis.lock.model.LockModel; import com.shiguiwu.springmybatis.mapper.LockMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @description: mysql實(shí)現(xiàn)分布式鎖 * @author: stone * @date: Created by 2021/9/10 11:09 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock */ @Component @Slf4j public class MysqlLock implements ILock<LockModel>{ static ThreadLocal<String> requestIds = new ThreadLocal<>(); @Autowired private LockMapper lockMapper; public String getRequestId() { String requestId = requestIds.get(); if (StrUtil.isBlank(requestId)) { requestId = UUID.randomUUID().toString(); requestIds.set(requestId); } log.info("獲取到的requestId===> {}", requestId); return requestId; } /** * 獲取鎖 * @param lockKey 鎖可以 * @param lockTimeout 持有鎖的有效時(shí)間,防止死鎖 * @param getTimeout 獲取鎖超時(shí)時(shí)間, * @return */ @Override public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception { log.info(" lock start =======================> {}",lockKey); //從local中獲取 請(qǐng)求id String requestId = this.getRequestId(); //獲取鎖的結(jié)果 boolean lockResult = false; //開始時(shí)間 long startTime = System.currentTimeMillis(); while (true) { LockModel lockModel = lockMapper.selectById(lockKey); if (Objects.nonNull(lockModel)) { //獲取鎖對(duì)象的請(qǐng)求id String reqId = lockModel.getRequestId(); //如果是空,表示改鎖未被占有 if (StrUtil.isBlank(reqId)) { //馬上占有它 //設(shè)置請(qǐng)求id lockModel.setRequestId(requestId); //設(shè)置鎖次數(shù) lockModel.setLockCount(1); //設(shè)置超時(shí)時(shí)間,防止死鎖 lockModel.setTimeout(System.currentTimeMillis() + lockTimeout); if (lockMapper.updateById(lockModel) == 1) { lockResult = true; break; } } //如果request_id和表中request_id一樣表示鎖被當(dāng)前線程持有者,此時(shí)需要加重入鎖 else if (requestId.equals(reqId)) { //可重入鎖 lockModel.setTimeout(System.currentTimeMillis() + lockTimeout); //設(shè)置獲取初次 lockModel.setLockCount(lockModel.getLockCount() + 1); if (lockMapper.updateById(lockModel) == 1) { lockResult = true; break; } } //不為空,也不相等,說(shuō)明是其他線程占有 else { //鎖不是自己的,并且已經(jīng)超時(shí)了,則重置鎖,繼續(xù)重試 if (lockModel.getTimeout() < System.currentTimeMillis()) { //未超時(shí),繼續(xù)重試 this.restLock(lockModel); } //如果未超時(shí),休眠100毫秒,繼續(xù)重試 else { if (startTime + getTimeout > System.currentTimeMillis()) { TimeUnit.MILLISECONDS.sleep(100); } else { //防止長(zhǎng)時(shí)間阻塞 break; } } } } //如果是空,就插入一個(gè)鎖,重新嘗試獲取鎖 else { lockModel = new LockModel(); //設(shè)置鎖key lockModel.setLockKey(lockKey); lockMapper.insert(lockModel); } } log.info(" lock end =======================> {}",lockKey); return lockResult; } /** * 釋放鎖 * @param lockKey 鎖key */ @Override public void unlock(String lockKey) { LockModel lockModel = lockMapper.selectById(lockKey); //獲取當(dāng)前線程的請(qǐng)求id String reqId = this.getRequestId(); //獲取鎖次數(shù) int count = 0; //當(dāng)前線程requestId和庫(kù)中request_id一致 && lock_count>0,表示可以釋放鎖 if (Objects.nonNull(lockModel) && reqId.equals(lockModel.getRequestId()) && (count = lockModel.getLockCount()) > 0) { if (count == 1) { //重置鎖 this.restLock(lockModel); } //重入鎖的問(wèn)題,鎖的次數(shù)減一 else { lockModel.setLockCount(lockModel.getLockCount() - 1); //更新次數(shù) lockMapper.updateById(lockModel); } } } /** * 重置鎖 * @param lockModel 鎖對(duì)象 * @return 更新條數(shù) */ @Override public int restLock(LockModel lockModel) { lockModel.setLockCount(0); lockModel.setRequestId(""); lockModel.setTimeout(0L); return lockMapper.updateById(lockModel); } }
上面代碼中實(shí)現(xiàn)了文章開頭列的分布式鎖的所有功能,大家可以認(rèn)真研究下獲取鎖的方法:lock,釋放鎖的方法:unlock。
測(cè)試用例
package com.shiguiwu.springmybatis; import com.shiguiwu.springmybatis.lock.ILock; import com.shiguiwu.springmybatis.lock.model.LockModel; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; /** * @description: 鎖測(cè)試 * @author: stone * @date: Created by 2021/9/10 15:32 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis */ @SpringBootTest @Slf4j public class LockApplicationTests { @Autowired private ILock<LockModel> mysqlLock; 測(cè)試重復(fù)獲取和重復(fù)釋放 @Test public void testRepeat() throws Exception { for (int i = 0; i < 10; i++) { mysqlLock.lock("key1", 10000L, 1000); } for (int i = 0; i < 10; i++) { mysqlLock.unlock("key1"); } } // //獲取之后不釋放,超時(shí)之后被thread1獲取 @Test public void testTimeout() throws Exception { String lockKey = "key2"; mysqlLock.lock(lockKey, 5000L, 1000); Thread thread1 = new Thread(() -> { try { mysqlLock.lock(lockKey, 5000L, 7000); } catch (Exception e) { e.printStackTrace(); } finally { mysqlLock.unlock(lockKey); } }, "thread1"); thread1.start(); thread1.join(); } }
test1方法測(cè)試了重入鎖的效果。
test2測(cè)試了主線程獲取鎖之后一直未釋放,持有鎖超時(shí)之后被thread1獲取到了
留給大家一個(gè)問(wèn)題
上面分布式鎖還需要考慮一個(gè)問(wèn)題:比如A機(jī)會(huì)獲取了key1的鎖,并設(shè)置持有鎖的超時(shí)時(shí)間為10秒,但是獲取鎖之后,執(zhí)行了一段業(yè)務(wù)操作,業(yè)務(wù)操作耗時(shí)超過(guò)10秒了,此時(shí)機(jī)器B去獲取鎖時(shí)可以獲取成功的,此時(shí)會(huì)導(dǎo)致A、B兩個(gè)機(jī)器都獲取鎖成功了,都在執(zhí)行業(yè)務(wù)操作,這種情況應(yīng)該怎么處理?大家可以思考一下然后留言,我們一起討論一下。
到此這篇關(guān)于MySql實(shí)現(xiàn)分布式鎖的示例代碼的文章就介紹到這了,更多相關(guān)MySql 分布式鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MySQL 元數(shù)據(jù)查看及實(shí)例代碼
這篇文章主要介紹了MySQL 元數(shù)據(jù)查看及實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下2017-01-01Ubuntu查看修改mysql的登錄名和密碼、安裝phpmyadmin
這篇文章主要介紹了Ubuntu查看修改mysql的登錄名和密碼、安裝phpmyadmin,本文分步驟給大家講解的非常詳細(xì),需要的朋友可以參考下2019-11-11Mysql 常用的時(shí)間日期及轉(zhuǎn)換函數(shù)小結(jié)
本文是腳本之家小編給大家總結(jié)的一些常用的mysql時(shí)間日期以及轉(zhuǎn)換函數(shù),非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2018-05-05MySQL數(shù)據(jù)定義語(yǔ)言DDL的基礎(chǔ)語(yǔ)句
這篇文章主要介紹了MySQL數(shù)據(jù)定義語(yǔ)言DDL的基礎(chǔ)語(yǔ)句,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08