MySql實現(xiàn)分布式鎖的示例代碼
本篇我們使用mysql實現(xiàn)一個分布式鎖。
環(huán)境:mysql8,navicat,maven,springboot2.3.11,mybatis-plus
分布式鎖的功能
1,分布式鎖使用者位于不同的機器中,鎖獲取成功之后,才可以對共享資源進行操作
2,鎖具有重入的功能:即一個使用者可以多次獲取某個鎖
3,獲取鎖有超時的功能:即在指定的時間內(nèi)去嘗試獲取鎖,超過了超時時間,如果還未獲取成功,則返回獲取失敗
4,能夠自動容錯,比如:A機器獲取鎖lock1之后,在釋放鎖lock1之前,A機器掛了,導(dǎo)致鎖lock1未釋放,結(jié)果會lock1一直被A機器占有著,遇到這種情況時,分布式鎖要能夠自動解決,可以這么做:持有鎖的時候可以加個持有超時時間,超過了這個時間還未釋放的,其他機器將有機會獲取鎖
預(yù)備技能:樂觀鎖
通常我們修改表中一條數(shù)據(jù)過程如下:
t1:select獲取記錄R1 t2:對R1進行編輯 t3:update R1
我們來看一下上面的過程存在的問題:
如果A、B兩個線程同時執(zhí)行到t1,他們倆看到的R1的數(shù)據(jù)一樣,然后都對R1進行編輯,然后去執(zhí)行t3,最終2個線程都會更新成功,后面一個線程會把前面一個線程update的結(jié)果給覆蓋掉,這就是并發(fā)修改數(shù)據(jù)存在的問題。
我們可以在表中新增一個版本號,每次更新數(shù)據(jù)時候?qū)姹咎栕鳛闂l件,并且每次更新時候版本號+1,過程優(yōu)化一下,如下:
t1:打開事務(wù)start transaction t2:select獲取記錄R1,聲明變量v=R1.version t3:對R1進行編輯 t4:執(zhí)行更新操作 update R1 set version = version + 1 where user_id=#user_id# and version = #v#; t5:t4中的update會返回影響的行數(shù),我們將其記錄在count中,然后根據(jù)count來判斷提交還是回滾 if(count==1){ //提交事務(wù) commit; }else{ //回滾事務(wù) rollback; }
上面重點在于步驟t4,當多個線程同時執(zhí)行到t1,他們看到的R1是一樣的,但是當他們執(zhí)行到t4的時候,數(shù)據(jù)庫會對update的這行記錄加鎖,確保并發(fā)情況下排隊執(zhí)行,所以只有第一個的update會返回1,其他的update結(jié)果會返回0,然后后面會判斷count是否為1,進而對事務(wù)進行提交或者回滾??梢酝ㄟ^count的值知道修改數(shù)據(jù)是否成功了。
上面這種方式就樂觀鎖。我們可以通過樂觀鎖的方式確保數(shù)據(jù)并發(fā)修改過程中的正確性。
使用mysql實現(xiàn)分布式鎖
我們創(chuàng)建一個分布式鎖表,如下
DROP TABLE IF EXISTS t_lock; create table t_lock( lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖唯一標志', request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來標識請求對象的', lock_count INT NOT NULL DEFAULT 0 COMMENT '當前上鎖次數(shù)', timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時時間', version INT NOT NULL DEFAULT 0 COMMENT '版本號,每次更新+1' )COMMENT '鎖信息表';
java代碼如下
mapper接口
package com.shiguiwu.springmybatis.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.shiguiwu.springmybatis.lock.model.LockModel; import org.springframework.stereotype.Repository; /** * @description: 鎖mapper * @author: stone * @date: Created by 2021/5/30 11:12 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.mapper */ @Repository public interface LockMapper extends BaseMapper<LockModel> { }
鎖對象model
package com.shiguiwu.springmybatis.lock.model; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.Version; import lombok.Data; /** * @description: 鎖模型 * @author: stone * @date: Created by 2021/9/10 11:13 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock.model */ @Data @TableName("t_lock") public class LockModel { /** * 鎖的唯一值 */ @TableId private String lockKey; /** * 請求id,同一個線程里請求id一樣 */ private String requestId; //鎖次數(shù) private Integer lockCount; //鎖超時 private Long timeout; //樂觀鎖版本 @Version private Integer version; }
鎖接口
package com.shiguiwu.springmybatis.lock; /** * @description: 鎖接口 * @author: stone * @date: Created by 2021/9/10 11:40 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock */ public interface ILock<T> { /** * 獲取分布式鎖,支持重入 * @param lockKey 鎖可以 * @param lockTimeout 持有鎖的有效時間,防止死鎖 * @param getTimeout 獲取鎖超時時間, * @return 是否鎖成功 */ public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception; /** * 解鎖 * @param lockKey 鎖key * */ public void unlock(String lockKey); /** * 重置鎖對象 * @param t 鎖對象 * @return 返回鎖記錄 */ public int restLock(T t); }
鎖的實現(xiàn)代碼如下
package com.shiguiwu.springmybatis.lock; import cn.hutool.core.util.StrUtil; import com.shiguiwu.springmybatis.lock.model.LockModel; import com.shiguiwu.springmybatis.mapper.LockMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @description: mysql實現(xiàn)分布式鎖 * @author: stone * @date: Created by 2021/9/10 11:09 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock */ @Component @Slf4j public class MysqlLock implements ILock<LockModel>{ static ThreadLocal<String> requestIds = new ThreadLocal<>(); @Autowired private LockMapper lockMapper; public String getRequestId() { String requestId = requestIds.get(); if (StrUtil.isBlank(requestId)) { requestId = UUID.randomUUID().toString(); requestIds.set(requestId); } log.info("獲取到的requestId===> {}", requestId); return requestId; } /** * 獲取鎖 * @param lockKey 鎖可以 * @param lockTimeout 持有鎖的有效時間,防止死鎖 * @param getTimeout 獲取鎖超時時間, * @return */ @Override public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception { log.info(" lock start =======================> {}",lockKey); //從local中獲取 請求id String requestId = this.getRequestId(); //獲取鎖的結(jié)果 boolean lockResult = false; //開始時間 long startTime = System.currentTimeMillis(); while (true) { LockModel lockModel = lockMapper.selectById(lockKey); if (Objects.nonNull(lockModel)) { //獲取鎖對象的請求id String reqId = lockModel.getRequestId(); //如果是空,表示改鎖未被占有 if (StrUtil.isBlank(reqId)) { //馬上占有它 //設(shè)置請求id lockModel.setRequestId(requestId); //設(shè)置鎖次數(shù) lockModel.setLockCount(1); //設(shè)置超時時間,防止死鎖 lockModel.setTimeout(System.currentTimeMillis() + lockTimeout); if (lockMapper.updateById(lockModel) == 1) { lockResult = true; break; } } //如果request_id和表中request_id一樣表示鎖被當前線程持有者,此時需要加重入鎖 else if (requestId.equals(reqId)) { //可重入鎖 lockModel.setTimeout(System.currentTimeMillis() + lockTimeout); //設(shè)置獲取初次 lockModel.setLockCount(lockModel.getLockCount() + 1); if (lockMapper.updateById(lockModel) == 1) { lockResult = true; break; } } //不為空,也不相等,說明是其他線程占有 else { //鎖不是自己的,并且已經(jīng)超時了,則重置鎖,繼續(xù)重試 if (lockModel.getTimeout() < System.currentTimeMillis()) { //未超時,繼續(xù)重試 this.restLock(lockModel); } //如果未超時,休眠100毫秒,繼續(xù)重試 else { if (startTime + getTimeout > System.currentTimeMillis()) { TimeUnit.MILLISECONDS.sleep(100); } else { //防止長時間阻塞 break; } } } } //如果是空,就插入一個鎖,重新嘗試獲取鎖 else { lockModel = new LockModel(); //設(shè)置鎖key lockModel.setLockKey(lockKey); lockMapper.insert(lockModel); } } log.info(" lock end =======================> {}",lockKey); return lockResult; } /** * 釋放鎖 * @param lockKey 鎖key */ @Override public void unlock(String lockKey) { LockModel lockModel = lockMapper.selectById(lockKey); //獲取當前線程的請求id String reqId = this.getRequestId(); //獲取鎖次數(shù) int count = 0; //當前線程requestId和庫中request_id一致 && lock_count>0,表示可以釋放鎖 if (Objects.nonNull(lockModel) && reqId.equals(lockModel.getRequestId()) && (count = lockModel.getLockCount()) > 0) { if (count == 1) { //重置鎖 this.restLock(lockModel); } //重入鎖的問題,鎖的次數(shù)減一 else { lockModel.setLockCount(lockModel.getLockCount() - 1); //更新次數(shù) lockMapper.updateById(lockModel); } } } /** * 重置鎖 * @param lockModel 鎖對象 * @return 更新條數(shù) */ @Override public int restLock(LockModel lockModel) { lockModel.setLockCount(0); lockModel.setRequestId(""); lockModel.setTimeout(0L); return lockMapper.updateById(lockModel); } }
上面代碼中實現(xiàn)了文章開頭列的分布式鎖的所有功能,大家可以認真研究下獲取鎖的方法:lock,釋放鎖的方法:unlock。
測試用例
package com.shiguiwu.springmybatis; import com.shiguiwu.springmybatis.lock.ILock; import com.shiguiwu.springmybatis.lock.model.LockModel; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; /** * @description: 鎖測試 * @author: stone * @date: Created by 2021/9/10 15:32 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis */ @SpringBootTest @Slf4j public class LockApplicationTests { @Autowired private ILock<LockModel> mysqlLock; 測試重復(fù)獲取和重復(fù)釋放 @Test public void testRepeat() throws Exception { for (int i = 0; i < 10; i++) { mysqlLock.lock("key1", 10000L, 1000); } for (int i = 0; i < 10; i++) { mysqlLock.unlock("key1"); } } // //獲取之后不釋放,超時之后被thread1獲取 @Test public void testTimeout() throws Exception { String lockKey = "key2"; mysqlLock.lock(lockKey, 5000L, 1000); Thread thread1 = new Thread(() -> { try { mysqlLock.lock(lockKey, 5000L, 7000); } catch (Exception e) { e.printStackTrace(); } finally { mysqlLock.unlock(lockKey); } }, "thread1"); thread1.start(); thread1.join(); } }
test1方法測試了重入鎖的效果。
test2測試了主線程獲取鎖之后一直未釋放,持有鎖超時之后被thread1獲取到了
留給大家一個問題
上面分布式鎖還需要考慮一個問題:比如A機會獲取了key1的鎖,并設(shè)置持有鎖的超時時間為10秒,但是獲取鎖之后,執(zhí)行了一段業(yè)務(wù)操作,業(yè)務(wù)操作耗時超過10秒了,此時機器B去獲取鎖時可以獲取成功的,此時會導(dǎo)致A、B兩個機器都獲取鎖成功了,都在執(zhí)行業(yè)務(wù)操作,這種情況應(yīng)該怎么處理?大家可以思考一下然后留言,我們一起討論一下。
到此這篇關(guān)于MySql實現(xiàn)分布式鎖的示例代碼的文章就介紹到這了,更多相關(guān)MySql 分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Ubuntu查看修改mysql的登錄名和密碼、安裝phpmyadmin
這篇文章主要介紹了Ubuntu查看修改mysql的登錄名和密碼、安裝phpmyadmin,本文分步驟給大家講解的非常詳細,需要的朋友可以參考下2019-11-11Mysql 常用的時間日期及轉(zhuǎn)換函數(shù)小結(jié)
本文是腳本之家小編給大家總結(jié)的一些常用的mysql時間日期以及轉(zhuǎn)換函數(shù),非常不錯,具有一定的參考借鑒價值,需要的朋友參考下吧2018-05-05MySQL數(shù)據(jù)定義語言DDL的基礎(chǔ)語句
這篇文章主要介紹了MySQL數(shù)據(jù)定義語言DDL的基礎(chǔ)語句,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-08-08