InterProcessMutex實現zookeeper分布式鎖原理
原理簡介:
zookeeper實現分布式鎖的原理就是多個節(jié)點同時在一個指定的節(jié)點下面創(chuàng)建臨時會話順序節(jié)點,誰創(chuàng)建的節(jié)點序號最小,誰就獲得了鎖,并且其他節(jié)點就會監(jiān)聽序號比自己小的節(jié)點,一旦序號比自己小的節(jié)點被刪除了,其他節(jié)點就會得到相應的事件,然后查看自己是否為序號最小的節(jié)點,如果是,則獲取鎖。
zookeeper節(jié)點圖分析

InterProcessMutex實現的鎖機制是公平且互斥的,公平的方式是按照每個請求的順序進行排隊的。
InterProcessMutex實現的InterProcessLock接口,InterProcessLock主要規(guī)范了如下幾個方法:
// 獲取互斥鎖 public void acquire() throws Exception; // 在給定的時間內獲取互斥鎖 public boolean acquire(long time, TimeUnit unit) throws Exception; // 釋放鎖處理 public void release() throws Exception; // 如果此JVM中的線程獲取了互斥鎖,則返回true boolean isAcquiredInThisProcess();
接下來我們看看InterProcessMutex中的實現,它究竟有哪些屬性,以及實現細節(jié)
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
// LockInternals是真正實現操作zookeeper的類,它內部包含連接zookeeper客戶端的CuratorFramework
// LockInternals的具體實現后面我會講到
private final LockInternals internals;
// basePath是鎖的根結點,所有的臨時有序的節(jié)點都是basePath的子節(jié)點,
private final String basePath;
//
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
// LockData封裝了請求對應的線程(owningThread)、鎖的重入的次數(lockCount)、線程對應的臨時節(jié)點(lockPath)
private static class LockData {
final Thread owningThread;
final String lockPath;
// 原子性的
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
private static final String LOCK_NAME = "lock-";
// 獲取互斥鎖,阻塞【InterProcessLock的實現】
@Override
public void acquire() throws Exception {
// 獲取鎖,一直等待
if ( !internalLock(-1, null) ) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
// 獲取互斥鎖,指定時間time【InterProcessLock的實現】
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}
// 當前線程是否占用鎖中【InterProcessLock的實現】
@Override
public boolean isAcquiredInThisProcess() {
return (threadData.size() > 0);
}
//如果調用線程與獲取互斥鎖的線程相同,則執(zhí)行一次互斥鎖釋放。如果線程已多次調用acquire,當此方法返回時,互斥鎖仍將保留 【InterProcessLock的實現】
@Override
public void release() throws Exception {
Thread currentThread = Thread.currentThread(); //當前線程
LockData lockData = threadData.get(currentThread); //線程對應的鎖信息
if ( lockData == null ) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
// 因為獲取到的鎖是可重入的,對lockCount進行減1,lockCount=0時才是真正釋放鎖
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 ) {
return;
}
if ( newLockCount < 0 ) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
// 到這里時lockCount=0,具體釋放鎖的操作交給LockInternals中的releaseLock方法實現
internals.releaseLock(lockData.lockPath);
}
finally {
threadData.remove(currentThread);
}
}
// 獲取basePath根結點下的所有臨時節(jié)點的有序集合
public Collection<String> getParticipantNodes() throws Exception {
return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver());
}
boolean isOwnedByCurrentThread() {
LockData lockData = threadData.get(Thread.currentThread());
return (lockData != null) && (lockData.lockCount.get() > 0);
}
protected String getLockPath() {
LockData lockData = threadData.get(Thread.currentThread());
return lockData != null ? lockData.lockPath : null;
}
// acquire()中調用的internalLock()方法
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null ) {
// 如果當前線程已經獲取到了鎖,那么將重入次數lockCount+1,返回true
lockData.lockCount.incrementAndGet();
return true;
}
// attemptLock方法是獲取鎖的真正實現,lockPath是當前線程成功在basePath下創(chuàng)建的節(jié)點,若lockPath不為空代表成功獲取到鎖
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null ) {
// lockPath封裝到當前線程對應的鎖信息中
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
}
接下來我們看看InterProcessMutex中使用的LockInternals類的實現細節(jié)
public class LockInternals {
private final CuratorFramework client; // 連接zookeeper的客戶端
private final String path; // 等于basePath,InterProcessMutex中傳進來的
private final String basePath; // 根結點
private final LockInternalsDriver driver; // 操作zookeeper節(jié)點的driver
private final String lockName; // "lock-"
private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null);
private final CuratorWatcher revocableWatcher = new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) {
checkRevocableWatcher(event.getPath());
}
}
};
// 監(jiān)聽節(jié)點的監(jiān)聽器,若被監(jiān)聽的節(jié)點有動靜,則喚醒 notifyFromWatcher()=>notifyAll();
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
notifyFromWatcher();
}
};
private volatile int maxLeases;
// 獲取basePath的子節(jié)點,排序后的
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
{
List<String> children = client.getChildren().forPath(basePath);
List<String> sortedList = Lists.newArrayList(children);
Collections.sort
(
sortedList,
new Comparator<String>()
{
@Override
public int compare(String lhs, String rhs)
{
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
// 嘗試獲取鎖【internalLock=>attemptLock】
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{ // 開始時間
final long startMillis = System.currentTimeMillis();
// 記錄等待時間
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
// 重試次數
int retryCount = 0;
// 當前節(jié)點
String ourPath = null;
// 是否獲取到鎖的標志
boolean hasTheLock = false;
// 是否放棄獲取到標志
boolean isDone = false;
// 不停嘗試獲取
while ( !isDone )
{
isDone = true;
try
{ // 創(chuàng)建當前線程對應的節(jié)點
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// internalLockLoop中獲取
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{ // 是否可再次嘗試
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
// 獲取到鎖后,返回當前線程對應創(chuàng)建的節(jié)點路徑
if ( hasTheLock )
{
return ourPath;
}
return null;
}
// 循環(huán)獲取【attemptLock=>internalLockLoop】
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false; // 是否擁有分布式鎖
boolean doDelete = false; // 是否需要刪除當前節(jié)點
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
// 循環(huán)嘗試獲取鎖
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{ // 得到basePath下排序后的臨時子節(jié)點
List<String> children = getSortedChildren();
// 獲取之前創(chuàng)建的當前線程對應的子節(jié)點
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 判斷是否獲取到鎖,沒有就返回監(jiān)聽路徑
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
// 成功獲取到
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{ // 沒有獲取到鎖,監(jiān)聽前一個臨時順序節(jié)點
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// 上一個臨時順序節(jié)點如果被刪除,會喚醒當前線程繼續(xù)競爭鎖
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
// 獲取鎖超時
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
// 因為獲取鎖超時,所以刪除之前創(chuàng)建的臨時子節(jié)點
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
private void deleteOurPath(String ourPath) throws Exception {
try
{
// 刪除
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
}
StandardLockInternalsDriver implements LockInternalsDriver
// 前面internalLockLoop方法中driver.getsTheLock執(zhí)行的方法
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
// 獲取子節(jié)點在臨時順序節(jié)點列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
// 檢驗子節(jié)點在臨時順序節(jié)點列表中是否有效
validateOurIndex(sequenceNodeName, ourIndex);
// 若當前子節(jié)點的位置<maxLeases,代表可獲取鎖【maxLeases默認=1,若ourIndex=0,代筆自己位置最小】
boolean getsTheLock = ourIndex < maxLeases;
// getsTheLock=true,則不需要監(jiān)聽前maxLeases的節(jié)點【maxLeases默認=1,代表監(jiān)聽前面最靠近自己的節(jié)點】
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
用InterProcessMutex在自己業(yè)務實現分布式鎖,請點擊此鏈接閱讀點我
到此這篇關于InterProcessMutex實現zookeeper分布式鎖原理的文章就介紹到這了,更多相關InterProcessMutex實現zookeeper分布式鎖內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
MyBatis Plus Mapper CRUD接口測試方式
在數據庫管理系統(tǒng)中,插入記錄是添加新數據條目,而刪除操作包括根據主鍵ID單條刪除和批量刪除,也可以基于特定條件進行刪除,刪除操作的SQL語句是通過鍵值對在Map中拼接而成,如delete from 表 where key1=value1 AND key2=value22024-09-09
如何實現springboot中controller之間的相互調用
這篇文章主要介紹了實現springboot中controller之間的相互調用方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
Spring Cloud中FeignClient實現文件上傳功能
這篇文章主要為大家詳細介紹了Spring Cloud中FeignClient實現文件上傳功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-04-04
springMvc異步的DeferredResult long?polling應用示例解析
這篇文章主要為大家介紹了springMvc中DeferredResult的long?polling應用示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03
解決一個JSON反序列化問題的辦法(空字符串變?yōu)榭占?
在平時的業(yè)務開發(fā)中,經常會有拿到一串序列化后的字符串要來反序列化,下面這篇文章主要給大家介紹了如何解決一個JSON反序列化問題的相關資料,空字符串變?yōu)榭占?需要的朋友可以參考下2024-03-03

