Java分布式鎖幾種常見的實(shí)現(xiàn)方式
前言
分布式鎖主要用于解決在分布式系統(tǒng)中多個節(jié)點(diǎn)對共享資源進(jìn)行并發(fā)訪問時可能出現(xiàn)的競爭問題。
在Java中實(shí)現(xiàn)分布式鎖的方式主要有以下幾種:
基于數(shù)據(jù)庫的實(shí)現(xiàn)
(1)唯一約束
通過數(shù)據(jù)庫表中設(shè)置唯一鍵約束來保證只有一個客戶端可以獲取到鎖。通常會有一張專門的鎖表,包含鎖名稱和鎖的持有者信息等字段。
CREATE TABLE distributed_locks (
lock_name VARCHAR(255) PRIMARY KEY,
owner_info VARCHAR(255) NOT NULL
);該代碼示例展示了如何使用上述表來獲取和釋放鎖,并執(zhí)行相應(yīng)的業(yè)務(wù)邏輯(例如更新庫存),在請求A對業(yè)務(wù)進(jìn)行操作的時候,假設(shè)請求B也進(jìn)入到此方法,則會由于鎖表的唯一索引lock_name而導(dǎo)致插入失敗,導(dǎo)致其操作被拒絕,而主鍵則需要針對不同業(yè)務(wù)場景設(shè)置,不同業(yè)務(wù)場景不會觸發(fā)鎖機(jī)制。
public void operateStock(Integer num) {
String lockName = "product_stock_update_lock"; // 鎖名稱,如:業(yè)務(wù)編碼
String ownerId = "d2d00005sa5s512"; // 當(dāng)前實(shí)例標(biāo)識符,如:用戶id
// SQL == "INSERT INTO distributed_locks(lock_name, owner_info) VALUES (?, ?)"
// 加鎖插入成功返回true
boolean gotLock = acquireLock(lockName, ownerId);
if (gotLock) {
try {
// 執(zhí)行業(yè)務(wù)邏輯,如更新指定物料庫存
//SQL == "UPDATE products SET stock = stock - #{num} WHERE product_id = '001' AND stock > 0";
updateProductStock(num);
} catch (Exception e) {
e.printStackTrace();
} finally {
// SQL == "DELETE FROM distributed_locks WHERE lock_name = ? AND owner_info = ?"
releaseLock(lockName, ownerId);
}
} else {
System.out.println("Failed to acquire lock.");
}
}(2)行鎖或表鎖
在查詢語句后面增加for update,數(shù)據(jù)庫會在查詢過程中給數(shù)據(jù)庫表增加排他鎖,當(dāng)某條記錄被加上排他鎖之后,其他線程無法再在該行記錄上增加排他鎖,而事務(wù)提交后會自動釋放鎖。
@Autowired
private ProductMapper productMapper;
@Transactional
public void updateProductStock(String productId) {
//查詢并加鎖 @Select("SELECT * FROM products WHERE product_id = #{productId} FOR UPDATE")
Product product = productMapper.selectForUpdate(productId);
if (product == null || product.getStock() <= 0) {
System.out.println("庫存不足或產(chǎn)品不存在");
return;
}
// 更新庫存 @Update("UPDATE products SET stock = stock - 1 WHERE product_id = #{productId}")
int rowsAffected = productMapper.updateStock(productId);
if (rowsAffected > 0) {
System.out.println("庫存更新成功");
} else {
System.out.println("未能成功更新庫存");
}
}(3)version樂觀鎖
樂觀鎖是一種處理并發(fā)控制的策略,它假設(shè)數(shù)據(jù)沖突的概率較低,因此不會在讀取數(shù)據(jù)時加鎖。相反,它會在更新數(shù)據(jù)時檢查數(shù)據(jù)是否被其他事務(wù)修改過。這通常通過一個版本號(version)字段或時間戳來實(shí)現(xiàn)。
讀取數(shù)據(jù):當(dāng)一個事務(wù)讀取數(shù)據(jù)時,同時獲取該記錄的版本號或時間戳。
修改數(shù)據(jù):當(dāng)事務(wù)嘗試更新數(shù)據(jù)時,它會使用版本號作為條件的一部分進(jìn)行更新操作。
CREATE TABLE products (
product_id VARCHAR(255) PRIMARY KEY,
stock INT NOT NULL,
version INT DEFAULT 0
); @Autowired
private ProductMapper productMapper;
@Transactional
public void updateProductStock(String productId) {
// 查詢并加鎖
// @Select("SELECT product_id,stock,version FROM products WHERE product_id = #{productId} FOR UPDATE")
Product product = productMapper.selectForUpdate(productId);
if (product == null || product.getStock() <= 0) {
System.out.println("庫存不足或產(chǎn)品不存在");
return;
}
// 嘗試更新庫存,并檢查版本號
// @Update("UPDATE products SET stock = stock - 1, version = version + 1 WHERE product_id = #{productId} AND version = #{version}")
int rowsAffected = productMapper.updateStockWithVersion(productId, product.getVersion());
if (rowsAffected > 0) {
System.out.println("庫存更新成功");
} else {
System.out.println("庫存更新失敗,可能已被其他事務(wù)更新");
// 這里可以根據(jù)業(yè)務(wù)需求選擇重試或者拋出異常等處理方式
}
}基于Redis的實(shí)現(xiàn)
使用Redis實(shí)現(xiàn)分布式鎖是一種高效且廣泛采用的方法,特別適合于需要高吞吐量和低延遲的場景。Redis通過其原子操作命令提供了一種簡單而有效的機(jī)制來實(shí)現(xiàn)分布式鎖。
SET resource_name my_random_value NX PX 30000
NX 表示僅在鍵不存在時設(shè)置鍵。
PX 30000 設(shè)置鍵的過期時間為30秒,防止死鎖(如果客戶端崩潰或網(wǎng)絡(luò)問題導(dǎo)致無法釋放鎖)。
RedisTemplate實(shí)現(xiàn)分布式鎖
編寫工具類
@Component
public class MyRedisLock {
private final RedisTemplate<String, String> redisTemplate;
@Autowired
// 自Spring 4.3起,如果只有一個構(gòu)造函數(shù),可以省略@Autowired注解
public MyRedisLock(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
// Lua 腳本用于釋放鎖
private static final String UNLOCK_SCRIPT =
"if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end";
// Lua 腳本用于續(xù)期鎖
private static final String RENEW_LOCK_SCRIPT =
"if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('PEXPIRE', KEYS[1], ARGV[2]) else return 0 end";
// 鎖的前綴,用于區(qū)分不同的鎖
private static final String LOCK_PREFIX = "lock:";
// 續(xù)期鎖的時間間隔(毫秒)
private static final long RENEW_INTERVAL_MS = 2000;
/**
嘗試獲取鎖
@param lockKey 鎖的鍵名
@param expireMs 鎖的過期時間(毫秒)
@param operateId 鎖的值
@return 如果成功獲取鎖,返回 true;否則返回 false
**/
public boolean tryLock(String lockKey, long expireMs, String operateId) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, operateId, expireMs, TimeUnit.MILLISECONDS); // setIfAbsent實(shí)現(xiàn)上鎖
return result != null && result;
}
/**
嘗試獲取鎖并自動續(xù)期
@param lockKey 鎖的鍵名
@param expireMs 鎖的過期時間(毫秒)
@param operateId 鎖的值
@return 如果成功獲取鎖,返回鎖的唯一標(biāo)識符;否則返回 null
**/
public boolean tryLockWithRenewal(String lockKey, long expireMs, String operateId) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, operateId, expireMs, TimeUnit.MILLISECONDS); // setIfAbsent實(shí)現(xiàn)上鎖
if (result != null && result) {
// 啟動續(xù)期線程
startRenewalThread(lockKey, operateId, expireMs);
return true;
}
return false;
}
/**
釋放鎖
@param lockKey 鎖的鍵名
@param operateId 鎖的值(用于驗(yàn)證是否是持有鎖的客戶端)
@return 如果成功釋放鎖,返回 true;否則返回 false
**/
public boolean unlock(String lockKey, String operateId) {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
// 執(zhí)行l(wèi)ua腳本,參數(shù)解釋下:
// 第一個參數(shù)script為lua腳本
// 第二個參數(shù)為key的集合,會依次替換lua腳本中的KEYS[]數(shù)組的數(shù)據(jù),默認(rèn)1開始
// 第三個參數(shù)為參數(shù)集合,會依次替換lua腳本中的ARGVS[]數(shù)組的數(shù)據(jù),默認(rèn)1開始
Long result = redisTemplate.execute(script, Collections.singletonList(LOCK_PREFIX + lockKey), operateId);
return result != null && result == 1L;
}
/**
自動續(xù)期鎖
@param lockKey 鎖的鍵名
@param operateId 鎖的值(用于驗(yàn)證是否是持有鎖的客戶端)
@param expireMs 鎖的過期時間(毫秒)
@return 如果成功續(xù)期,返回 true;否則返回 false
**/
public boolean renewLock(String lockKey, String operateId, long expireMs) {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(RENEW_LOCK_SCRIPT, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(LOCK_PREFIX + lockKey), operateId, String.valueOf(expireMs));
return result != null && result == 1L;
}
/**
啟動續(xù)期線程
@param lockKey 鎖的鍵名
@param operateId 鎖的值
@param expireMs 鎖的過期時間(毫秒)
**/
private void startRenewalThread(final String lockKey, final String operateId, final long expireMs) {
Thread renewalThread = new Thread(() -> {
try {
while (true) {
// 每隔一段時間續(xù)期一次,需要確保間隔時間小于過期時間,過期或釋放鎖將無法續(xù)費(fèi)
Thread.sleep(RENEW_INTERVAL_MS);
if (!renewLock(lockKey, operateId, expireMs)) { // 續(xù)鎖操作
// 如果續(xù)期失敗,直接結(jié)束守護(hù)線程,停止鎖續(xù)期行為。
// 這里說明下,刪除鎖和續(xù)鎖都需要驗(yàn)證lockValue,這個上鎖時通過uuid創(chuàng)建的,其他線程肯定獲取的都不一致,這樣確保續(xù)鎖行為只能是自己的守護(hù)線程才可以操作;如果續(xù)鎖失敗了,則說明是主線程完成任務(wù)刪除了key鎖,所以這里守護(hù)線程也可以結(jié)束了
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
renewalThread.setDaemon(true); // 設(shè)置為守護(hù)線程
renewalThread.start();
}
}代碼示例:常規(guī)鎖
@Autowired
private MyRedisLock redisLock;
// 鎖的默認(rèn)過期時間(毫秒)
private static final long DEFAULT_EXPIRE_TIME_MS = 5000;
public void testLock() throws InterruptedException {
String lockKey = "my_distributed_lock";
//進(jìn)程標(biāo)識ID
String operateId = java.util.UUID.randomUUID().toString();
if (redisLock.tryLock(lockKey, DEFAULT_EXPIRE_TIME_MS, operateId)) {
System.out.println(" 獲取到鎖,開始執(zhí)行任務(wù)...");
try {
// 執(zhí)行業(yè)務(wù)邏輯
Thread.sleep(5000); // 模擬耗時操作
System.out.println(Thread.currentThread().getName() + " 任務(wù)執(zhí)行完成");
} finally {
if (redisLock.unlock(lockKey, operateId)) {
System.out.println(Thread.currentThread().getName() + " 成功釋放鎖");
} else {
System.out.println(Thread.currentThread().getName() + " 釋放鎖失敗,鎖可能已被其他客戶端刪除");
}
}
} else {
System.out.println(Thread.currentThread().getName() + " 未能獲取到鎖");
}
}代碼示例:續(xù)費(fèi)線程鎖
@Autowired
private MyRedisLock redisLock;
// 鎖的默認(rèn)過期時間(毫秒)
private static final long DEFAULT_EXPIRE_TIME_MS = 5000;
public void testLock() throws InterruptedException {
String lockKey = "my_distributed_lock";
//進(jìn)程標(biāo)識ID
String operateId = java.util.UUID.randomUUID().toString();
// 嘗試獲取鎖并自動續(xù)期
if ( redisLock.tryLockWithRenewal(lockKey, DEFAULT_EXPIRE_TIME_MS, operateId)) {
try {
// 執(zhí)行業(yè)務(wù)邏輯
Thread.sleep(8000);
System.out.println(Thread.currentThread().getName() + " 任務(wù)執(zhí)行完成");
} finally {
// 釋放鎖
boolean unlockSuccess = redisLock.unlock(lockKey, operateId);
if (unlockSuccess) {
System.out.println(Thread.currentThread().getName() + " 成功釋放鎖");
} else {
System.out.println(Thread.currentThread().getName() + " 釋放鎖失敗,鎖可能已被其他客戶端刪除");
}
}
} else {
System.out.println(Thread.currentThread().getName() + " 未能獲取到鎖");
}
}基于Zookeeper的實(shí)現(xiàn)
基本原理
- 創(chuàng)建臨時順序節(jié)點(diǎn):客戶端嘗試在特定路徑下創(chuàng)建一個臨時順序節(jié)點(diǎn)(例如/locks/lock-),以表示對鎖的競爭。
- 判斷是否獲得鎖:檢查自己創(chuàng)建的節(jié)點(diǎn)是否是該路徑下所有子節(jié)點(diǎn)中最小的一個。如果是,則表示獲得了鎖;如果不是,則監(jiān)聽前一個節(jié)點(diǎn)(即比自己小的那個節(jié)點(diǎn))的變化。
- 監(jiān)聽前一節(jié)點(diǎn)刪除事件:如果當(dāng)前節(jié)點(diǎn)不是最小節(jié)點(diǎn),則需要等待直到前一節(jié)點(diǎn)被刪除(意味著前一客戶端釋放了鎖),然后重新檢查是否可以獲得鎖。
- 釋放鎖:當(dāng)業(yè)務(wù)邏輯執(zhí)行完畢后,客戶端可以主動刪除自己創(chuàng)建的節(jié)點(diǎn)來釋放鎖。此外,由于使用的是臨時節(jié)點(diǎn),如果客戶端崩潰或與ZooKeeper斷開連接,該節(jié)點(diǎn)也會自動被刪除。
引入依賴(一個用于簡化ZooKeeper操作的框架)
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>代碼示例
private static final String ZK_ADDRESS = "localhost:2181";
private static final String LOCK_PATH = "/distributed_lock_example";
public void operate() throws Exception {
// 創(chuàng)建CuratorFramework實(shí)例
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
client.start();
// 使用InterProcessMutex作為分布式鎖
InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
try {
// 獲取鎖
lock.acquire();
//業(yè)務(wù)操作代碼
this.performBusinessLogic();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock.isAcquiredInThisProcess()) {
try {
// 釋放鎖
lock.release();
System.out.println("Lock released.");
} catch (Exception e) {
e.printStackTrace();
}
}
// 關(guān)閉客戶端
client.close();
}
}
private static void performBusinessLogic() throws InterruptedException {
// 模擬業(yè)務(wù)邏輯處理
System.out.println("Performing some operations...");
Thread.sleep(5000); // 暫停5秒模擬長時間操作
}基于etcd的實(shí)現(xiàn)(僅了解)
類似于Zookeeper,etcd也提供了類似的分布式協(xié)調(diào)服務(wù),可以通過創(chuàng)建租約(lease)并附加到關(guān)鍵路徑上來實(shí)現(xiàn)分布式鎖。Etcd支持事務(wù)、watch機(jī)制等功能,使得它同樣適用于構(gòu)建分布式鎖。
總結(jié)
到此這篇關(guān)于Java分布式鎖幾種常見的實(shí)現(xiàn)方式的文章就介紹到這了,更多相關(guān)Java分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mybatis 插件: 打印 sql 及其執(zhí)行時間實(shí)現(xiàn)方法
下面小編就為大家?guī)硪黄猰ybatis 插件: 打印 sql 及其執(zhí)行時間實(shí)現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06
Springboot配置管理Externalized?Configuration深入探究
這篇文章主要介紹了Springboot配置管Externalized?Configuration深入探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
淺談Java實(shí)體對象的三種狀態(tài)以及轉(zhuǎn)換關(guān)系
這篇文章主要介紹了淺談Java實(shí)體對象的三種狀態(tài)以及轉(zhuǎn)換關(guān)系,具有一定參考價值,需要的朋友可以,看看。。2017-11-11
spring Retryable注解實(shí)現(xiàn)重試詳解
這篇文章主要介紹了spring Retryable注解實(shí)現(xiàn)重試詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09

