詳解Java如何實現(xiàn)基于Redis的分布式鎖
前言
單JVM內同步好辦, 直接用JDK提供的鎖就可以了,但是跨進程同步靠這個肯定是不可能的,這種情況下肯定要借助第三方,我這里實現(xiàn)用Redis,當然還有很多其他的實現(xiàn)方式。其實基于Redis實現(xiàn)的原理還算比較簡單的,在看代碼之前建議大家先去看看原理,看懂了之后看代碼應該就容易理解了。
我這里不實現(xiàn)JDK的java.util.concurrent.locks.Lock接口,而是自定義一個,因為JDK的有個newCondition方法我這里暫時沒實現(xiàn)。這個Lock提供了5個lock方法的變體,可以自行選擇使用哪一個來獲取鎖,我的想法是最好用帶超時返回的那幾個方法,因為不這樣的話,假如redis掛了,線程永遠都在那死循環(huán)了(關于這里,應該還可以進一步優(yōu)化,如果redis掛了,Jedis的操作肯定會拋異常之類的,可以定義個機制讓redis掛了的時候通知使用這個lock的用戶,或者說是線程)
package cc.lixiaohui.lock;
import java.util.concurrent.TimeUnit;
public interface Lock {
/**
* 阻塞性的獲取鎖, 不響應中斷
*/
void lock;
/**
* 阻塞性的獲取鎖, 響應中斷
*
* @throws InterruptedException
*/
void lockInterruptibly throws InterruptedException;
/**
* 嘗試獲取鎖, 獲取不到立即返回, 不阻塞
*/
boolean tryLock;
/**
* 超時自動返回的阻塞性的獲取鎖, 不響應中斷
*
* @param time
* @param unit
* @return {@code true} 若成功獲取到鎖, {@code false} 若在指定時間內未���取到鎖
*
*/
boolean tryLock(long time, TimeUnit unit);
/**
* 超時自動返回的阻塞性的獲取鎖, 響應中斷
*
* @param time
* @param unit
* @return {@code true} 若成功獲取到鎖, {@code false} 若在指定時間內未獲取到鎖
* @throws InterruptedException 在嘗試獲取鎖的當前線程被中斷
*/
boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;
/**
* 釋放鎖
*/
void unlock;
}
看其抽象實現(xiàn):
package cc.lixiaohui.lock;
import java.util.concurrent.TimeUnit;
/**
* 鎖的骨架實現(xiàn), 真正的獲取鎖的步驟由子類去實現(xiàn).
*
* @author lixiaohui
*
*/
public abstract class AbstractLock implements Lock {
/**
* <pre>
* 這里需不需要保證可見性值得討論, 因為是分布式的鎖,
* 1.同一個jvm的多個線程使用不同的鎖對象其實也是可以的, 這種情況下不需要保證可見性
* 2.同一個jvm的多個線程使用同一個鎖對象, 那可見性就必須要保證了.
* </pre>
*/
protected volatile boolean locked;
/**
* 當前jvm內持有該鎖的線程(if have one)
*/
private Thread exclusiveOwnerThread;
public void lock {
try {
lock(false, 0, null, false);
} catch (InterruptedException e) {
// TODO ignore
}
}
public void lockInterruptibly throws InterruptedException {
lock(false, 0, null, true);
}
public boolean tryLock(long time, TimeUnit unit) {
try {
return lock(true, time, unit, false);
} catch (InterruptedException e) {
// TODO ignore
}
return false;
}
public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
return lock(true, time, unit, true);
}
public void unlock {
// TODO 檢查當前線程是否持有鎖
if (Thread.currentThread != getExclusiveOwnerThread) {
throw new IllegalMonitorStateException("current thread does not hold the lock");
}
unlock0;
setExclusiveOwnerThread(null);
}
protected void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread {
return exclusiveOwnerThread;
}
protected abstract void unlock0;
/**
* 阻塞式獲取鎖的實現(xiàn)
*
* @param useTimeout
* @param time
* @param unit
* @param interrupt 是否響應中斷
* @return
* @throws InterruptedException
*/
protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;
}
基于Redis的最終實現(xiàn),關鍵的獲取鎖,釋放鎖的代碼在這個類的lock方法和unlock0方法里,大家可以只看這兩個方法然后完全自己寫一個:
package cc.lixiaohui.lock;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.Jedis;
/**
* <pre>
* 基于Redis的SETNX操作實現(xiàn)的分布式鎖
*
* 獲取鎖時最好用lock(long time, TimeUnit unit), 以免網路問題而導致線程一直阻塞
*
* <a >SETNC操作參考資料</a>
* </pre>
*
* @author lixiaohui
*
*/
public class RedisBasedDistributedLock extends AbstractLock {
private Jedis jedis;
// 鎖的名字
protected String lockKey;
// 鎖的有效時長(毫秒)
protected long lockExpires;
public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) {
this.jedis = jedis;
this.lockKey = lockKey;
this.lockExpires = lockExpires;
}
// 阻塞式獲取鎖的實現(xiàn)
protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{
if (interrupt) {
checkInterruption;
}
long start = System.currentTimeMillis;
long timeout = unit.toMillis(time); // if !useTimeout, then it's useless
while (useTimeout ? isTimeout(start, timeout) : true) {
if (interrupt) {
checkInterruption;
}
long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//鎖超時時間
String stringOfLockExpireTime = String.valueOf(lockExpireTime);
if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 獲取到鎖
// TODO 成功獲取到鎖, 設置相關標識
locked = true;
setExclusiveOwnerThread(Thread.currentThread);
return true;
}
String value = jedis.get(lockKey);
if (value != null && isTimeExpired(value)) { // lock is expired
// 假設多個線程(非單jvm)同時走到這里
String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
// 但是走到這里時每個線程拿到的oldValue肯定不可能一樣(因為getset是原子性的)
// 加入拿到的oldValue依然是expired的,那么就說明拿到鎖了
if (oldValue != null && isTimeExpired(oldValue)) {
// TODO 成功獲取到鎖, 設置相關標識
locked = true;
setExclusiveOwnerThread(Thread.currentThread);
return true;
}
} else {
// TODO lock is not expired, enter next loop retrying
}
}
return false;
}
public boolean tryLock {
long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//鎖超時時間
String stringOfLockExpireTime = String.valueOf(lockExpireTime);
if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 獲取到鎖
// TODO 成功獲取到鎖, 設置相關標識
locked = true;
setExclusiveOwnerThread(Thread.currentThread);
return true;
}
String value = jedis.get(lockKey);
if (value != null && isTimeExpired(value)) { // lock is expired
// 假設多個線程(非單jvm)同時走到這里
String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
// 但是走到這里時每個線程拿到的oldValue肯定不可能一樣(因為getset是原子性的)
// 假如拿到的oldValue依然是expired的,那么就說明拿到鎖了
if (oldValue != null && isTimeExpired(oldValue)) {
// TODO 成功獲取到鎖, 設置相關標識
locked = true;
setExclusiveOwnerThread(Thread.currentThread);
return true;
}
} else {
// TODO lock is not expired, enter next loop retrying
}
return false;
}
/**
* Queries if this lock is held by any thread.
*
* @return {@code true} if any thread holds this lock and
* {@code false} otherwise
*/
public boolean isLocked {
if (locked) {
return true;
} else {
String value = jedis.get(lockKey);
// TODO 這里其實是有問題的, 想:當get方法返回value后, 假設這個value已經是過期的了,
// 而就在這瞬間, 另一個節(jié)點set了value, 這時鎖是被別的線程(節(jié)點持有), 而接下來的判斷
// 是檢測不出這種情況的.不過這個問題應該不會導致其它的問題出現(xiàn), 因為這個方法的目的本來就
// 不是同步控制, 它只是一種鎖狀態(tài)的報告.
return !isTimeExpired(value);
}
}
@Override
protected void unlock0 {
// TODO 判斷鎖是否過期
String value = jedis.get(lockKey);
if (!isTimeExpired(value)) {
doUnlock;
}
}
private void checkInterruption throws InterruptedException {
if(Thread.currentThread.isInterrupted) {
throw new InterruptedException;
}
}
private boolean isTimeExpired(String value) {
return Long.parseLong(value) < System.currentTimeMillis;
}
private boolean isTimeout(long start, long timeout) {
return start + timeout > System.currentTimeMillis;
}
private void doUnlock {
jedis.del(lockKey);
}
}
如果將來還換一種實現(xiàn)方式(比如zookeeper之類的),到時直接繼承AbstractLock并實現(xiàn)lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt), unlock0方法即可(所謂抽象嘛)
測試
模擬全局ID增長器,設計一個IDGenerator類,該類負責生成全局遞增ID,其代碼如下:
package cc.lixiaohui.lock;
import java.math.BigInteger;
import java.util.concurrent.TimeUnit;
/**
* 模擬ID生成
* @author lixiaohui
*
*/
public class IDGenerator {
private static BigInteger id = BigInteger.valueOf(0);
private final Lock lock;
private static final BigInteger INCREMENT = BigInteger.valueOf(1);
public IDGenerator(Lock lock) {
this.lock = lock;
}
public String getAndIncrement {
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
// TODO 這里獲取到鎖, 訪問臨界區(qū)資源
return getAndIncrement0;
} finally {
lock.unlock;
}
}
return null;
//return getAndIncrement0;
}
private String getAndIncrement0 {
String s = id.toString;
id = id.add(INCREMENT);
return s;
}
}
測試主邏輯:同一個JVM內開兩個線程死循環(huán)地(循環(huán)之間無間隔,有的話測試就沒意義了)獲取ID(我這里并不是死循環(huán)而是跑20s),獲取到ID存到同一個Set里面,在存之前先檢查該ID在set中是否存在,如果已存在,則讓兩個線程都停止。如果程序能正常跑完20s,那么說明這個分布式鎖還算可以滿足要求,如此測試的效果應該和不同JVM(也就是真正的分布式環(huán)境中)測試的效果是一樣的,下面是測試類的代碼:
package cc.lixiaohui.DistributedLock.DistributedLock;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import cc.lixiaohui.lock.IDGenerator;
import cc.lixiaohui.lock.Lock;
import cc.lixiaohui.lock.RedisBasedDistributedLock;
public class IDGeneratorTest {
private static Set<String> generatedIds = new HashSet<String>;
private static final String LOCK_KEY = "lock.lock";
private static final long LOCK_EXPIRE = 5 * 1000;
@Test
public void test throws InterruptedException {
Jedis jedis1 = new Jedis("localhost", 6379);
Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE);
IDGenerator g1 = new IDGenerator(lock1);
IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1");
Jedis jedis2 = new Jedis("localhost", 6379);
Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE);
IDGenerator g2 = new IDGenerator(lock2);
IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2");
Thread t1 = new Thread(consume1);
Thread t2 = new Thread(consume2);
t1.start;
t2.start;
Thread.sleep(20 * 1000); //讓兩個線程跑20秒
IDConsumeMission.stop;
t1.join;
t2.join;
}
static String time {
return String.valueOf(System.currentTimeMillis / 1000);
}
static class IDConsumeMission implements Runnable {
private IDGenerator idGenerator;
private String name;
private static volatile boolean stop;
public IDConsumeMission(IDGenerator idGenerator, String name) {
this.idGenerator = idGenerator;
this.name = name;
}
public static void stop {
stop = true;
}
public void run {
System.out.println(time + ": consume " + name + " start ");
while (!stop) {
String id = idGenerator.getAndIncrement;
if(generatedIds.contains(id)) {
System.out.println(time + ": duplicate id generated, id = " + id);
stop = true;
continue;
}
generatedIds.add(id);
System.out.println(time + ": consume " + name + " add id = " + id);
}
System.out.println(time + ": consume " + name + " done ");
}
}
}
說明一點,我這里停止兩個線程的方式并不是很好,我是為了方便才這么做的,因為只是測試,最好不要這么做。
測試結果
跑20s打印的東西太多,前面打印的被clear了,只有差不多跑完的時候才有,下面截圖。說明了這個鎖能正常工作:

當IDGererator沒有加鎖(即IDGererator的getAndIncrement方法內部獲取id時不上鎖)時,測試是不通過的,非常大的概率中途就會停止,下面是不加鎖時的測試結果:
這個1秒都不到:

這個也1秒都不到:

結束語
好了,以上就是Java實現(xiàn)基于Redis的分布式鎖的全部內容,各位如果發(fā)現(xiàn)問題希望能指正,希望這篇文章能對大家的學習和工作帶來一定的幫助,如果有疑問可以留言交流。
相關文章
Java中Comparator與Comparable排序的區(qū)別詳解
這篇文章主要介紹了Java中Comparator與Comparable排序的區(qū)別詳解,如果你有一個類,希望支持同類型的自定義比較策略,可以實現(xiàn)接口Comparable,如果某個類,沒有實現(xiàn)Comparable,但是又希望對它進行比較,則可以自定義一個Comparator,需要的朋友可以參考下2024-01-01
MyBatis中動態(tài)SQL語句@Provider的用法
本文主要介紹了MyBatis中動態(tài)SQL語句@Provider的用法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-06-06
Spring @Scheduler使用cron表達式時的執(zhí)行問題詳解
Spring給程序猿們帶來了許多便利。下面這篇文章主要給大家介紹了關于Spring @Scheduler使用cron表達式時的執(zhí)行問題的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友們下面隨著小編來一起學習學習吧2018-09-09

