Mysql中分布式鎖的具體實(shí)現(xiàn)
分布式鎖的功能
- 分布式鎖使用者位于不同的機(jī)器中,鎖獲取成功之后,才可以對(duì)共享資源進(jìn)行操作
- 鎖具有重入的功能:即一個(gè)使用者可以多次獲取某個(gè)鎖
- 獲取鎖有超時(shí)的功能:即在指定的時(shí)間內(nèi)去嘗試獲取鎖,超過(guò)了超時(shí)時(shí)間,如果還未獲取成功,則返回獲取失敗
- 能夠自動(dòng)容錯(cuò),比如:A機(jī)器獲取鎖lock1之后,在釋放鎖lock1之前,A機(jī)器掛了,導(dǎo)致鎖lock1未釋放,結(jié)果會(huì)lock1一直被A機(jī)器占有著,遇到這種情況時(shí),分布式鎖要能夠自動(dòng)解決,可以這么做:持有鎖的時(shí)候可以加個(gè)持有超時(shí)時(shí)間,超過(guò)了這個(gè)時(shí)間還未釋放的,其他機(jī)器將有機(jī)會(huì)獲取鎖
預(yù)備技能:樂(lè)觀鎖
通常我們修改表中一條數(shù)據(jù)過(guò)程如下:
sql復(fù)制代碼t1:select獲取記錄R1
t2:對(duì)R1進(jìn)行編輯
t3:update R1
我們來(lái)看一下上面的過(guò)程存在的問(wèn)題:
如果A、B兩個(gè)線程同時(shí)執(zhí)行到t1,他們倆看到的R1的數(shù)據(jù)一樣,然后都對(duì)R1進(jìn)行編輯,然后去執(zhí)行t3,最終2個(gè)線程都會(huì)更新成功,后面一個(gè)線程會(huì)把前面一個(gè)線程update的結(jié)果給覆蓋掉,這就是并發(fā)修改數(shù)據(jù)存在的問(wèn)題。
我們可以在表中新增一個(gè)版本號(hào),每次更新數(shù)據(jù)時(shí)候?qū)姹咎?hào)作為條件,并且每次更新時(shí)候版本號(hào)+1,過(guò)程優(yōu)化一下,如下:
t1:打開(kāi)事務(wù)start transaction
t2:select獲取記錄R1,聲明變量v=R1.version
t3:對(duì)R1進(jìn)行編輯
t4:執(zhí)行更新操作
update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
t5:t4中的update會(huì)返回影響的行數(shù),我們將其記錄在count中,然后根據(jù)count來(lái)判斷提交還是回滾
if(count==1){
//提交事務(wù)
commit;
}else{
//回滾事務(wù)
rollback;
}
上面重點(diǎn)在于步驟t4,當(dāng)多個(gè)線程同時(shí)執(zhí)行到t1,他們看到的R1是一樣的,但是當(dāng)他們執(zhí)行到t4的時(shí)候,數(shù)據(jù)庫(kù)會(huì)對(duì)update的這行記錄加鎖,確保并發(fā)情況下排隊(duì)執(zhí)行,所以只有第一個(gè)的update會(huì)返回1,其他的update結(jié)果會(huì)返回0,然后后面會(huì)判斷count是否為1,進(jìn)而對(duì)事務(wù)進(jìn)行提交或者回滾。可以通過(guò)count的值知道修改數(shù)據(jù)是否成功了。
上面這種方式就樂(lè)觀鎖。我們可以通過(guò)樂(lè)觀鎖的方式確保數(shù)據(jù)并發(fā)修改過(guò)程中的正確性。
使用mysql實(shí)現(xiàn)分布式鎖
建表
我們創(chuàng)建一個(gè)分布式鎖表,如下
DROP DATABASE IF EXISTS javacodeabcd; CREATE DATABASE javacodeabcd; USE javacodeabcd; DROP TABLE IF EXISTS t_lock; create table t_lock( lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖唯一標(biāo)志', request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來(lái)標(biāo)識(shí)請(qǐng)求對(duì)象的', lock_count INT NOT NULL DEFAULT 0 COMMENT '當(dāng)前上鎖次數(shù)', timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時(shí)時(shí)間', version INT NOT NULL DEFAULT 0 COMMENT '版本號(hào),每次更新+1' )COMMENT '鎖信息表';
分布式鎖工具類:
package com.itsoku.sql;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.sql.*;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Slf4j
public class LockUtils {
//將requestid保存在該變量中
static ThreadLocal<String> requestIdTL = new ThreadLocal<>();
/**
* 獲取當(dāng)前線程requestid
*
* @return
*/
public static String getRequestId() {
String requestId = requestIdTL.get();
if (requestId == null || "".equals(requestId)) {
requestId = UUID.randomUUID().toString();
requestIdTL.set(requestId);
}
log.info("requestId:{}", requestId);
return requestId;
}
/**
* 獲取鎖
*
* @param lock_key 鎖key
* @param locktimeout(毫秒) 持有鎖的有效時(shí)間,防止死鎖
* @param gettimeout(毫秒) 獲取鎖的超時(shí)時(shí)間,這個(gè)時(shí)間內(nèi)獲取不到將重試
* @return
*/
public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception {
log.info("start");
boolean lockResult = false;
String request_id = getRequestId();
long starttime = System.currentTimeMillis();
while (true) {
LockModel lockModel = LockUtils.get(lock_key);
if (Objects.isNull(lockModel)) {
//插入一條記錄,重新嘗試獲取鎖
LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id("").lock_count(0).timeout(0L).version(0).build());
} else {
String reqid = lockModel.getRequest_id();
//如果reqid為空字符,表示鎖未被占用
if ("".equals(reqid)) {
lockModel.setRequest_id(request_id);
lockModel.setLock_count(1);
lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
if (LockUtils.update(lockModel) == 1) {
lockResult = true;
break;
}
} else if (request_id.equals(reqid)) {
//如果request_id和表中request_id一樣表示鎖被當(dāng)前線程持有者,此時(shí)需要加重入鎖
lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
lockModel.setLock_count(lockModel.getLock_count() + 1);
if (LockUtils.update(lockModel) == 1) {
lockResult = true;
break;
}
} else {
//鎖不是自己的,并且已經(jīng)超時(shí)了,則重置鎖,繼續(xù)重試
if (lockModel.getTimeout() < System.currentTimeMillis()) {
LockUtils.resetLock(lockModel);
} else {
//如果未超時(shí),休眠100毫秒,繼續(xù)重試
if (starttime + gettimeout > System.currentTimeMillis()) {
TimeUnit.MILLISECONDS.sleep(100);
} else {
break;
}
}
}
}
}
log.info("end");
return lockResult;
}
/**
* 釋放鎖
*
* @param lock_key
* @throws Exception
*/
public static void unlock(String lock_key) throws Exception {
//獲取當(dāng)前線程requestId
String requestId = getRequestId();
LockModel lockModel = LockUtils.get(lock_key);
//當(dāng)前線程requestId和庫(kù)中request_id一致 && lock_count>0,表示可以釋放鎖
if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {
if (lockModel.getLock_count() == 1) {
//重置鎖
resetLock(lockModel);
} else {
lockModel.setLock_count(lockModel.getLock_count() - 1);
LockUtils.update(lockModel);
}
}
}
/**
* 重置鎖
*
* @param lockModel
* @return
* @throws Exception
*/
public static int resetLock(LockModel lockModel) throws Exception {
lockModel.setRequest_id("");
lockModel.setLock_count(0);
lockModel.setTimeout(0L);
return LockUtils.update(lockModel);
}
/**
* 更新lockModel信息,內(nèi)部采用樂(lè)觀鎖來(lái)更新
*
* @param lockModel
* @return
* @throws Exception
*/
public static int update(LockModel lockModel) throws Exception {
return exec(conn -> {
String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND version = ?";
PreparedStatement ps = conn.prepareStatement(sql);
int colIndex = 1;
ps.setString(colIndex++, lockModel.getRequest_id());
ps.setInt(colIndex++, lockModel.getLock_count());
ps.setLong(colIndex++, lockModel.getTimeout());
ps.setString(colIndex++, lockModel.getLock_key());
ps.setInt(colIndex++, lockModel.getVersion());
return ps.executeUpdate();
});
}
public static LockModel get(String lock_key) throws Exception {
return exec(conn -> {
String sql = "select * from t_lock t WHERE t.lock_key=?";
PreparedStatement ps = conn.prepareStatement(sql);
int colIndex = 1;
ps.setString(colIndex++, lock_key);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
return LockModel.builder().
lock_key(lock_key).
request_id(rs.getString("request_id")).
lock_count(rs.getInt("lock_count")).
timeout(rs.getLong("timeout")).
version(rs.getInt("version")).build();
}
return null;
});
}
public static int insert(LockModel lockModel) throws Exception {
return exec(conn -> {
String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";
PreparedStatement ps = conn.prepareStatement(sql);
int colIndex = 1;
ps.setString(colIndex++, lockModel.getLock_key());
ps.setString(colIndex++, lockModel.getRequest_id());
ps.setInt(colIndex++, lockModel.getLock_count());
ps.setLong(colIndex++, lockModel.getTimeout());
ps.setInt(colIndex++, lockModel.getVersion());
return ps.executeUpdate();
});
}
public static <T> T exec(SqlExec<T> sqlExec) throws Exception {
Connection conn = getConn();
try {
return sqlExec.exec(conn);
} finally {
closeConn(conn);
}
}
@FunctionalInterface
public interface SqlExec<T> {
T exec(Connection conn) throws Exception;
}
@Getter
@Setter
@Builder
public static class LockModel {
private String lock_key;
private String request_id;
private Integer lock_count;
private Long timeout;
private Integer version;
}
private static final String url = "jdbc:mysql://localhost:3306/javacodeabcd?useSSL=false"; //數(shù)據(jù)庫(kù)地址
private static final String username = "root"; //數(shù)據(jù)庫(kù)用戶名
private static final String password = "root123"; //數(shù)據(jù)庫(kù)密碼
private static final String driver = "com.mysql.jdbc.Driver"; //mysql驅(qū)動(dòng)
/**
* 連接數(shù)據(jù)庫(kù)
*
* @return
*/
public static Connection getConn() {
Connection conn = null;
try {
Class.forName(driver); //加載數(shù)據(jù)庫(kù)驅(qū)動(dòng)
try {
conn = DriverManager.getConnection(url, username, password); //連接數(shù)據(jù)庫(kù)
} catch (SQLException e) {
e.printStackTrace();
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return conn;
}
/**
* 關(guān)閉數(shù)據(jù)庫(kù)鏈接
*
* @return
*/
public static void closeConn(Connection conn) {
if (conn != null) {
try {
conn.close(); //關(guān)閉數(shù)據(jù)庫(kù)鏈接
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}上面代碼中實(shí)現(xiàn)了文章開(kāi)頭列的分布式鎖的所有功能,大家可以認(rèn)真研究下獲取鎖的方法:lock,釋放鎖的方法:unlock。
測(cè)試用例
package com.itsoku.sql;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import static com.itsoku.sql.LockUtils.lock;
import static com.itsoku.sql.LockUtils.unlock;
@Slf4j
public class LockUtilsTest {
//測(cè)試重復(fù)獲取和重復(fù)釋放
@Test
public void test1() throws Exception {
String lock_key = "key1";
for (int i = 0; i < 10; i++) {
lock(lock_key, 10000L, 1000);
}
for (int i = 0; i < 9; i++) {
unlock(lock_key);
}
}
//獲取之后不釋放,超時(shí)之后被thread1獲取
@Test
public void test2() throws Exception {
String lock_key = "key2";
lock(lock_key, 5000L, 1000);
Thread thread1 = new Thread(() -> {
try {
try {
lock(lock_key, 5000L, 7000);
} finally {
unlock(lock_key);
}
} catch (Exception e) {
e.printStackTrace();
}
});
thread1.setName("thread1");
thread1.start();
thread1.join();
}
}- test1方法測(cè)試了重入鎖的效果。
- test2測(cè)試了主線程獲取鎖之后一直未釋放,持有鎖超時(shí)之后被thread1獲取到了。
留給大家一個(gè)問(wèn)題
上面分布式鎖還需要考慮一個(gè)問(wèn)題:比如A機(jī)會(huì)獲取了key1的鎖,并設(shè)置持有鎖的超時(shí)時(shí)間為10秒,但是獲取鎖之后,執(zhí)行了一段業(yè)務(wù)操作,業(yè)務(wù)操作耗時(shí)超過(guò)10秒了,此時(shí)機(jī)器B去獲取鎖時(shí)可以獲取成功的,此時(shí)會(huì)導(dǎo)致A、B兩個(gè)機(jī)器都獲取鎖成功了,都在執(zhí)行業(yè)務(wù)操作,這種情況應(yīng)該怎么處理?
到此這篇關(guān)于Mysql中分布式鎖的具體實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Mysql 分布式鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MySQL查詢進(jìn)階操作從函數(shù)到表連接的使用
這篇文章主要介紹了MySQL查詢進(jìn)階從函數(shù)到表連接的使用,包括mysql函數(shù)的使用,MySQL的分組分頁(yè)及查詢關(guān)鍵字的執(zhí)行順序,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08
Mysql數(shù)據(jù)庫(kù)亂碼問(wèn)題的對(duì)應(yīng)方式
今天小編就為大家分享一篇關(guān)于Mysql數(shù)據(jù)庫(kù)亂碼問(wèn)題的對(duì)應(yīng)方式,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12
MySQL內(nèi)存及虛擬內(nèi)存優(yōu)化設(shè)置參數(shù)
這篇文章主要介紹了MySQL內(nèi)存及虛擬內(nèi)存優(yōu)化設(shè)置參數(shù),需要的朋友可以參考下2016-05-05
講解Linux系統(tǒng)下如何自動(dòng)備份MySQL數(shù)據(jù)的基本教程
這篇文章主要介紹了Linux系統(tǒng)下如何自動(dòng)備份MySQL數(shù)據(jù)的基本教程,還給出了利用shell腳本全備份和增量備份的基本方法,需要的朋友可以參考下2015-11-11
IntelliJ?IDEA?2024與MySQL?8連接以及driver問(wèn)題解決辦法
在IDE開(kāi)發(fā)工具中也是可以使用mysql的,下面這篇文章主要給大家介紹了關(guān)于IntelliJ?IDEA?2024與MySQL?8連接以及driver問(wèn)題解決辦法,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-09-09
提升MYSQL查詢效率的10個(gè)SQL語(yǔ)句優(yōu)化技巧
MySQL數(shù)據(jù)庫(kù)執(zhí)行效率對(duì)程序的執(zhí)行速度有很大的影響,有效的處理優(yōu)化數(shù)據(jù)庫(kù)是非常有用的。尤其是大量數(shù)據(jù)需要處理的時(shí)候2018-03-03
Mysql查詢優(yōu)化之IN子查詢優(yōu)化方法詳解
項(xiàng)目中有需要,使用MySQL的in子查詢,查詢符合in子查詢集合中條件的數(shù)據(jù),但是沒(méi)想到的是,MySQL的in子查詢會(huì)如此的慢,讓人無(wú)法接受,下面這篇文章主要給大家介紹了關(guān)于Mysql查詢優(yōu)化之IN子查詢優(yōu)化的相關(guān)資料,需要的朋友可以參考下2023-02-02

