Mysql中分布式鎖的具體實現
分布式鎖的功能
- 分布式鎖使用者位于不同的機器中,鎖獲取成功之后,才可以對共享資源進行操作
- 鎖具有重入的功能:即一個使用者可以多次獲取某個鎖
- 獲取鎖有超時的功能:即在指定的時間內去嘗試獲取鎖,超過了超時時間,如果還未獲取成功,則返回獲取失敗
- 能夠自動容錯,比如:A機器獲取鎖lock1之后,在釋放鎖lock1之前,A機器掛了,導致鎖lock1未釋放,結果會lock1一直被A機器占有著,遇到這種情況時,分布式鎖要能夠自動解決,可以這么做:持有鎖的時候可以加個持有超時時間,超過了這個時間還未釋放的,其他機器將有機會獲取鎖
預備技能:樂觀鎖
通常我們修改表中一條數據過程如下:
sql復制代碼t1:select獲取記錄R1
t2:對R1進行編輯
t3:update R1
我們來看一下上面的過程存在的問題:
如果A、B兩個線程同時執(zhí)行到t1,他們倆看到的R1的數據一樣,然后都對R1進行編輯,然后去執(zhí)行t3,最終2個線程都會更新成功,后面一個線程會把前面一個線程update的結果給覆蓋掉,這就是并發(fā)修改數據存在的問題。
我們可以在表中新增一個版本號,每次更新數據時候將版本號作為條件,并且每次更新時候版本號+1,過程優(yōu)化一下,如下:
t1:打開事務start transaction
t2:select獲取記錄R1,聲明變量v=R1.version
t3:對R1進行編輯
t4:執(zhí)行更新操作
update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
t5:t4中的update會返回影響的行數,我們將其記錄在count中,然后根據count來判斷提交還是回滾
if(count==1){
//提交事務
commit;
}else{
//回滾事務
rollback;
}
上面重點在于步驟t4,當多個線程同時執(zhí)行到t1,他們看到的R1是一樣的,但是當他們執(zhí)行到t4的時候,數據庫會對update的這行記錄加鎖,確保并發(fā)情況下排隊執(zhí)行,所以只有第一個的update會返回1,其他的update結果會返回0,然后后面會判斷count是否為1,進而對事務進行提交或者回滾。可以通過count的值知道修改數據是否成功了。
上面這種方式就樂觀鎖。我們可以通過樂觀鎖的方式確保數據并發(fā)修改過程中的正確性。
使用mysql實現分布式鎖
建表
我們創(chuàng)建一個分布式鎖表,如下
DROP DATABASE IF EXISTS javacodeabcd; CREATE DATABASE javacodeabcd; USE javacodeabcd; DROP TABLE IF EXISTS t_lock; create table t_lock( lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖唯一標志', request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來標識請求對象的', lock_count INT NOT NULL DEFAULT 0 COMMENT '當前上鎖次數', timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時時間', version INT NOT NULL DEFAULT 0 COMMENT '版本號,每次更新+1' )COMMENT '鎖信息表';
分布式鎖工具類:
package com.itsoku.sql; import lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.sql.*; import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; @Slf4j public class LockUtils { //將requestid保存在該變量中 static ThreadLocal<String> requestIdTL = new ThreadLocal<>(); /** * 獲取當前線程requestid * * @return */ public static String getRequestId() { String requestId = requestIdTL.get(); if (requestId == null || "".equals(requestId)) { requestId = UUID.randomUUID().toString(); requestIdTL.set(requestId); } log.info("requestId:{}", requestId); return requestId; } /** * 獲取鎖 * * @param lock_key 鎖key * @param locktimeout(毫秒) 持有鎖的有效時間,防止死鎖 * @param gettimeout(毫秒) 獲取鎖的超時時間,這個時間內獲取不到將重試 * @return */ public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception { log.info("start"); boolean lockResult = false; String request_id = getRequestId(); long starttime = System.currentTimeMillis(); while (true) { LockModel lockModel = LockUtils.get(lock_key); if (Objects.isNull(lockModel)) { //插入一條記錄,重新嘗試獲取鎖 LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id("").lock_count(0).timeout(0L).version(0).build()); } else { String reqid = lockModel.getRequest_id(); //如果reqid為空字符,表示鎖未被占用 if ("".equals(reqid)) { lockModel.setRequest_id(request_id); lockModel.setLock_count(1); lockModel.setTimeout(System.currentTimeMillis() + locktimeout); if (LockUtils.update(lockModel) == 1) { lockResult = true; break; } } else if (request_id.equals(reqid)) { //如果request_id和表中request_id一樣表示鎖被當前線程持有者,此時需要加重入鎖 lockModel.setTimeout(System.currentTimeMillis() + locktimeout); lockModel.setLock_count(lockModel.getLock_count() + 1); if (LockUtils.update(lockModel) == 1) { lockResult = true; break; } } else { //鎖不是自己的,并且已經超時了,則重置鎖,繼續(xù)重試 if (lockModel.getTimeout() < System.currentTimeMillis()) { LockUtils.resetLock(lockModel); } else { //如果未超時,休眠100毫秒,繼續(xù)重試 if (starttime + gettimeout > System.currentTimeMillis()) { TimeUnit.MILLISECONDS.sleep(100); } else { break; } } } } } log.info("end"); return lockResult; } /** * 釋放鎖 * * @param lock_key * @throws Exception */ public static void unlock(String lock_key) throws Exception { //獲取當前線程requestId String requestId = getRequestId(); LockModel lockModel = LockUtils.get(lock_key); //當前線程requestId和庫中request_id一致 && lock_count>0,表示可以釋放鎖 if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) { if (lockModel.getLock_count() == 1) { //重置鎖 resetLock(lockModel); } else { lockModel.setLock_count(lockModel.getLock_count() - 1); LockUtils.update(lockModel); } } } /** * 重置鎖 * * @param lockModel * @return * @throws Exception */ public static int resetLock(LockModel lockModel) throws Exception { lockModel.setRequest_id(""); lockModel.setLock_count(0); lockModel.setTimeout(0L); return LockUtils.update(lockModel); } /** * 更新lockModel信息,內部采用樂觀鎖來更新 * * @param lockModel * @return * @throws Exception */ public static int update(LockModel lockModel) throws Exception { return exec(conn -> { String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND version = ?"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lockModel.getRequest_id()); ps.setInt(colIndex++, lockModel.getLock_count()); ps.setLong(colIndex++, lockModel.getTimeout()); ps.setString(colIndex++, lockModel.getLock_key()); ps.setInt(colIndex++, lockModel.getVersion()); return ps.executeUpdate(); }); } public static LockModel get(String lock_key) throws Exception { return exec(conn -> { String sql = "select * from t_lock t WHERE t.lock_key=?"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lock_key); ResultSet rs = ps.executeQuery(); if (rs.next()) { return LockModel.builder(). lock_key(lock_key). request_id(rs.getString("request_id")). lock_count(rs.getInt("lock_count")). timeout(rs.getLong("timeout")). version(rs.getInt("version")).build(); } return null; }); } public static int insert(LockModel lockModel) throws Exception { return exec(conn -> { String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lockModel.getLock_key()); ps.setString(colIndex++, lockModel.getRequest_id()); ps.setInt(colIndex++, lockModel.getLock_count()); ps.setLong(colIndex++, lockModel.getTimeout()); ps.setInt(colIndex++, lockModel.getVersion()); return ps.executeUpdate(); }); } public static <T> T exec(SqlExec<T> sqlExec) throws Exception { Connection conn = getConn(); try { return sqlExec.exec(conn); } finally { closeConn(conn); } } @FunctionalInterface public interface SqlExec<T> { T exec(Connection conn) throws Exception; } @Getter @Setter @Builder public static class LockModel { private String lock_key; private String request_id; private Integer lock_count; private Long timeout; private Integer version; } private static final String url = "jdbc:mysql://localhost:3306/javacodeabcd?useSSL=false"; //數據庫地址 private static final String username = "root"; //數據庫用戶名 private static final String password = "root123"; //數據庫密碼 private static final String driver = "com.mysql.jdbc.Driver"; //mysql驅動 /** * 連接數據庫 * * @return */ public static Connection getConn() { Connection conn = null; try { Class.forName(driver); //加載數據庫驅動 try { conn = DriverManager.getConnection(url, username, password); //連接數據庫 } catch (SQLException e) { e.printStackTrace(); } } catch (ClassNotFoundException e) { e.printStackTrace(); } return conn; } /** * 關閉數據庫鏈接 * * @return */ public static void closeConn(Connection conn) { if (conn != null) { try { conn.close(); //關閉數據庫鏈接 } catch (SQLException e) { e.printStackTrace(); } } } }
上面代碼中實現了文章開頭列的分布式鎖的所有功能,大家可以認真研究下獲取鎖的方法:lock,釋放鎖的方法:unlock。
測試用例
package com.itsoku.sql; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import static com.itsoku.sql.LockUtils.lock; import static com.itsoku.sql.LockUtils.unlock; @Slf4j public class LockUtilsTest { //測試重復獲取和重復釋放 @Test public void test1() throws Exception { String lock_key = "key1"; for (int i = 0; i < 10; i++) { lock(lock_key, 10000L, 1000); } for (int i = 0; i < 9; i++) { unlock(lock_key); } } //獲取之后不釋放,超時之后被thread1獲取 @Test public void test2() throws Exception { String lock_key = "key2"; lock(lock_key, 5000L, 1000); Thread thread1 = new Thread(() -> { try { try { lock(lock_key, 5000L, 7000); } finally { unlock(lock_key); } } catch (Exception e) { e.printStackTrace(); } }); thread1.setName("thread1"); thread1.start(); thread1.join(); } }
- test1方法測試了重入鎖的效果。
- test2測試了主線程獲取鎖之后一直未釋放,持有鎖超時之后被thread1獲取到了。
留給大家一個問題
上面分布式鎖還需要考慮一個問題:比如A機會獲取了key1的鎖,并設置持有鎖的超時時間為10秒,但是獲取鎖之后,執(zhí)行了一段業(yè)務操作,業(yè)務操作耗時超過10秒了,此時機器B去獲取鎖時可以獲取成功的,此時會導致A、B兩個機器都獲取鎖成功了,都在執(zhí)行業(yè)務操作,這種情況應該怎么處理?
到此這篇關于Mysql中分布式鎖的具體實現的文章就介紹到這了,更多相關Mysql 分布式鎖內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
講解Linux系統(tǒng)下如何自動備份MySQL數據的基本教程
這篇文章主要介紹了Linux系統(tǒng)下如何自動備份MySQL數據的基本教程,還給出了利用shell腳本全備份和增量備份的基本方法,需要的朋友可以參考下2015-11-11IntelliJ?IDEA?2024與MySQL?8連接以及driver問題解決辦法
在IDE開發(fā)工具中也是可以使用mysql的,下面這篇文章主要給大家介紹了關于IntelliJ?IDEA?2024與MySQL?8連接以及driver問題解決辦法,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2024-09-09Mysql查詢優(yōu)化之IN子查詢優(yōu)化方法詳解
項目中有需要,使用MySQL的in子查詢,查詢符合in子查詢集合中條件的數據,但是沒想到的是,MySQL的in子查詢會如此的慢,讓人無法接受,下面這篇文章主要給大家介紹了關于Mysql查詢優(yōu)化之IN子查詢優(yōu)化的相關資料,需要的朋友可以參考下2023-02-02