淺談Java(SpringBoot)基于zookeeper的分布式鎖實(shí)現(xiàn)
通過zookeeper實(shí)現(xiàn)分布式鎖
1、創(chuàng)建zookeeper的client
首先通過CuratorFrameworkFactory創(chuàng)建一個(gè)連接zookeeper的連接CuratorFramework client
public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ContractFileInfoController.class);
private String connectionString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private RetryPolicy retryPolicy;
private CuratorFramework client;
public CuratorFactoryBean(String connectionString) {
this(connectionString, 500, 500);
}
public CuratorFactoryBean(String connectionString, int sessionTimeoutMs, int connectionTimeoutMs) {
this.connectionString = connectionString;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
}
@Override
public void destroy() throws Exception {
LOGGER.info("Closing curator framework...");
this.client.close();
LOGGER.info("Closed curator framework.");
}
@Override
public CuratorFramework getObject() throws Exception {
return this.client;
}
@Override
public Class<?> getObjectType() {
return this.client != null ? this.client.getClass() : CuratorFramework.class;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public void afterPropertiesSet() throws Exception {
if (StringUtils.isEmpty(this.connectionString)) {
throw new IllegalStateException("connectionString can not be empty.");
} else {
if (this.retryPolicy == null) {
this.retryPolicy = new ExponentialBackoffRetry(1000, 2147483647, 180000);
}
this.client = CuratorFrameworkFactory.newClient(this.connectionString, this.sessionTimeoutMs, this.connectionTimeoutMs, this.retryPolicy);
this.client.start();
this.client.blockUntilConnected(30, TimeUnit.MILLISECONDS);
}
}
public void setConnectionString(String connectionString) {
this.connectionString = connectionString;
}
public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}
public void setConnectionTimeoutMs(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
}
public void setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}
public void setClient(CuratorFramework client) {
this.client = client;
}
}
2、封裝分布式鎖
根據(jù)CuratorFramework創(chuàng)建InterProcessMutex(分布式可重入排它鎖)對一行數(shù)據(jù)進(jìn)行上鎖
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
使用 acquire方法
1、acquire() :入?yún)榭?,調(diào)用該方法后,會一直堵塞,直到搶奪到鎖資源,或者zookeeper連接中斷后,上拋異常。
2、acquire(long time, TimeUnit unit):入?yún)魅氤瑫r(shí)時(shí)間、單位,搶奪時(shí),如果出現(xiàn)堵塞,會在超過該時(shí)間后,返回false。
public void acquire() throws Exception {
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
public boolean acquire(long time, TimeUnit unit) throws Exception {
return this.internalLock(time, unit);
}
釋放鎖 mutex.release();
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
} else {
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount <= 0) {
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
} else {
try {
this.internals.releaseLock(lockData.lockPath);
} finally {
this.threadData.remove(currentThread);
}
}
}
}
}
封裝后的DLock代碼
1、調(diào)用InterProcessMutex processMutex = dLock.mutex(path);
2、手動釋放鎖processMutex.release();
3、需要手動刪除路徑dLock.del(path);
推薦 使用:
都是 函數(shù)式編程
在業(yè)務(wù)代碼執(zhí)行完畢后 會釋放鎖和刪除path
1、這個(gè)有返回結(jié)果
public T mutex(String path, ZkLockCallback zkLockCallback, long time, TimeUnit timeUnit)
2、這個(gè)無返回結(jié)果
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit)
public class DLock {
private final Logger logger;
private static final long TIMEOUT_D = 100L;
private static final String ROOT_PATH_D = "/dLock";
private String lockRootPath;
private CuratorFramework client;
public DLock(CuratorFramework client) {
this("/dLock", client);
}
public DLock(String lockRootPath, CuratorFramework client) {
this.logger = LoggerFactory.getLogger(DLock.class);
this.lockRootPath = lockRootPath;
this.client = client;
}
public InterProcessMutex mutex(String path) {
if (!StringUtils.startsWith(path, "/")) {
path = Constant.keyBuilder(new Object[]{"/", path});
}
return new InterProcessMutex(this.client, Constant.keyBuilder(new Object[]{this.lockRootPath, "", path}));
}
public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback) throws ZkLockException {
return this.mutex(path, zkLockCallback, 100L, TimeUnit.MILLISECONDS);
}
public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {
String finalPath = this.getLockPath(path);
InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);
try {
if (!mutex.acquire(time, timeUnit)) {
throw new ZkLockException("acquire zk lock return false");
}
} catch (Exception var13) {
throw new ZkLockException("acquire zk lock failed.", var13);
}
T var8;
try {
var8 = zkLockCallback.doInLock();
} finally {
this.releaseLock(finalPath, mutex);
}
return var8;
}
private void releaseLock(String finalPath, InterProcessMutex mutex) {
try {
mutex.release();
this.logger.info("delete zk node path:{}", finalPath);
this.deleteInternal(finalPath);
} catch (Exception var4) {
this.logger.error("dlock", "release lock failed, path:{}", finalPath, var4);
// LogUtil.error(this.logger, "dlock", "release lock failed, path:{}", new Object[]{finalPath, var4});
}
}
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {
String finalPath = this.getLockPath(path);
InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);
try {
if (!mutex.acquire(time, timeUnit)) {
throw new ZkLockException("acquire zk lock return false");
}
} catch (Exception var13) {
throw new ZkLockException("acquire zk lock failed.", var13);
}
try {
zkLockCallback.response();
} finally {
this.releaseLock(finalPath, mutex);
}
}
public String getLockPath(String customPath) {
if (!StringUtils.startsWith(customPath, "/")) {
customPath = Constant.keyBuilder(new Object[]{"/", customPath});
}
String finalPath = Constant.keyBuilder(new Object[]{this.lockRootPath, "", customPath});
return finalPath;
}
private void deleteInternal(String finalPath) {
try {
((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath);
} catch (Exception var3) {
this.logger.info("delete zk node path:{} failed", finalPath);
}
}
public void del(String customPath) {
String lockPath = "";
try {
lockPath = this.getLockPath(customPath);
((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath);
} catch (Exception var4) {
this.logger.info("delete zk node path:{} failed", lockPath);
}
}
}
@FunctionalInterface
public interface ZkLockCallback<T> {
T doInLock();
}
@FunctionalInterface
public interface ZkVoidCallBack {
void response();
}
public class ZkLockException extends Exception {
public ZkLockException() {
}
public ZkLockException(String message) {
super(message);
}
public ZkLockException(String message, Throwable cause) {
super(message, cause);
}
}
配置CuratorConfig
@Configuration
public class CuratorConfig {
@Value("${zk.connectionString}")
private String connectionString;
@Value("${zk.sessionTimeoutMs:500}")
private int sessionTimeoutMs;
@Value("${zk.connectionTimeoutMs:500}")
private int connectionTimeoutMs;
@Value("${zk.dLockRoot:/dLock}")
private String dLockRoot;
@Bean
public CuratorFactoryBean curatorFactoryBean() {
return new CuratorFactoryBean(connectionString, sessionTimeoutMs, connectionTimeoutMs);
}
@Bean
@Autowired
public DLock dLock(CuratorFramework client) {
return new DLock(dLockRoot, client);
}
}
測試代碼
@RestController
@RequestMapping("/dLock")
public class LockController {
@Autowired
private DLock dLock;
@RequestMapping("/lock")
public Map testDLock(String no){
final String path = Constant.keyBuilder("/test/no/", no);
Long mutex=0l;
try {
System.out.println("在拿鎖:"+path+System.currentTimeMillis());
mutex = dLock.mutex(path, () -> {
try {
System.out.println("拿到鎖了" + System.currentTimeMillis());
Thread.sleep(10000);
System.out.println("操作完成了" + System.currentTimeMillis());
} finally {
return System.currentTimeMillis();
}
}, 1000, TimeUnit.MILLISECONDS);
} catch (ZkLockException e) {
System.out.println("拿不到鎖呀"+System.currentTimeMillis());
}
return Collections.singletonMap("ret",mutex);
}
@RequestMapping("/dlock")
public Map testDLock1(String no){
final String path = Constant.keyBuilder("/test/no/", no);
Long mutex=0l;
try {
System.out.println("在拿鎖:"+path+System.currentTimeMillis());
InterProcessMutex processMutex = dLock.mutex(path);
processMutex.acquire();
System.out.println("拿到鎖了" + System.currentTimeMillis());
Thread.sleep(10000);
processMutex.release();
System.out.println("操作完成了" + System.currentTimeMillis());
} catch (ZkLockException e) {
System.out.println("拿不到鎖呀"+System.currentTimeMillis());
e.printStackTrace();
}catch (Exception e){
e.printStackTrace();
}
return Collections.singletonMap("ret",mutex);
}
@RequestMapping("/del")
public Map delDLock(String no){
final String path = Constant.keyBuilder("/test/no/", no);
dLock.del(path);
return Collections.singletonMap("ret",1);
}
}
以上所述是小編給大家介紹的Java(SpringBoot)基于zookeeper的分布式鎖實(shí)現(xiàn)詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時(shí)回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
基于Log4j2阻塞業(yè)務(wù)線程引發(fā)的思考
這篇文章主要介紹了基于Log4j2阻塞業(yè)務(wù)線程引發(fā)的思考,基于很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
Java解決線程的不安全問題之volatile關(guān)鍵字詳解
這篇文章主要介紹了Java解決線程的不安全問題之volatile關(guān)鍵字詳解,可見性指一個(gè)線程對共享變量值的修改,能夠及時(shí)地被其他線程看到,而 volatile 關(guān)鍵字就保證內(nèi)存的可見性,需要的朋友可以參考下2023-08-08
java調(diào)用ffmpeg實(shí)現(xiàn)轉(zhuǎn)換視頻
這篇文章主要為大家詳細(xì)介紹了java調(diào)用ffmpeg實(shí)現(xiàn)轉(zhuǎn)換視頻功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-12-12
nacos配置中心遠(yuǎn)程調(diào)用讀取不到配置文件的解決
這篇文章主要介紹了nacos配置中心遠(yuǎn)程調(diào)用讀取不到配置文件的解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。2022-01-01
Springboot基于enable模塊驅(qū)動的實(shí)現(xiàn)
這篇文章主要介紹了Springboot基于enable模塊驅(qū)動的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08
mybatis中mapper.xml文件的常用屬性及標(biāo)簽講解
這篇文章主要介紹了mybatis中mapper.xml文件的常用屬性及標(biāo)簽講解,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
Java?Springboot異步執(zhí)行事件監(jiān)聽和處理實(shí)例
Java?SpringBoot中,監(jiān)聽和處理事件是一種常見的模式,它允許不同的組件之間通過事件進(jìn)行通信,事件監(jiān)聽和處理通常通過Spring的事件發(fā)布-訂閱模型來實(shí)現(xiàn),一個(gè)簡單的Spring?Boot應(yīng)用程序示例,其中將包括事件的定義、事件的發(fā)布以及事件的監(jiān)聽2024-07-07
如何使用MybatisPlus的SQL注入器提升批量插入性能
本文給大家介紹如何使用MybatisPlus的SQL注入器提升批量插入性能,以實(shí)戰(zhàn)視角講述如何利用該特性提升MybatisPlus?的批量插入性能,感興趣的朋友跟隨小編一起看看吧2024-05-05

