欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

zookeeper實戰(zhàn)之實現(xiàn)分布式鎖的方法

 更新時間:2022年11月10日 08:38:19   作者:木木他爹  
Zookeeper實現(xiàn)分布式鎖比Redis簡單,Zookeeper有一個特性,多個線程在Zookeeper里創(chuàng)建同一個節(jié)點時,只有一個線程執(zhí)行成功,Zookeeper主要是利用臨時有序節(jié)點這一特性實現(xiàn)分布式鎖,感興趣的朋友跟隨小編一起學(xué)習(xí)吧

一、分布式鎖的通用實現(xiàn)思路

分布式鎖的概念以及常規(guī)解決方案可以參考之前的博客:聊聊分布式鎖的解決方案;今天我們先分析下分布式鎖的實現(xiàn)思路;

  • 首先,需要保證唯一性,即某一時點只能有一個線程訪問某一資源;比方說待辦短信通知功能,每天早上九點短信提醒所有工單的處理人處理工單,假設(shè)服務(wù)部署了20個容器,那么早上九點的時候會有20個線程啟動準(zhǔn)備發(fā)送短信,此時我們只能讓一個線程執(zhí)行短信發(fā)送,否則用戶會收到20條相同的短信;
  • 其次,需要考慮下何時應(yīng)該釋放鎖?這又分三種情況,一是拿到鎖的線程正常結(jié)束,另一種是獲取鎖的線程異常退出,還有種是獲取鎖的線程一直阻塞;第一種情況直接釋放即可,第二種情況可以通過定義下鎖的過期時間然后通過定時任務(wù)去釋放鎖;zk的話直接通過臨時節(jié)點即可;最后一種阻塞的情況也可以通過定時任務(wù)來釋放,但是需要根據(jù)業(yè)務(wù)來綜合判斷,如果業(yè)務(wù)本身就是長時間耗時的操作那么鎖的過期時間就得設(shè)置的久一點
  • 最后,當(dāng)拿到鎖的線程釋放鎖的時候,如何通知其他線程可以搶鎖了呢
    這里簡單介紹兩種解決方案,一種是所有需要鎖的線程主動輪詢,固定時間去訪問下看鎖是否釋放,但是這種方案無端增加服務(wù)器壓力并且時效性無法保證;另一種就是zk的watch,監(jiān)聽鎖所在的目錄,一有變化立馬得到通知

二、ZK實現(xiàn)分布式鎖的思路

zk通過每個線程在同一父目錄下創(chuàng)建臨時有序節(jié)點,然后通過比較節(jié)點的id大小來實現(xiàn)分布式鎖功能;再通過zk的watch機制實時獲取節(jié)點的狀態(tài),如果被刪除立即重新爭搶鎖;具體流程見線圖:

提示:需要關(guān)注下圖里判斷自身不是最小節(jié)點時的監(jiān)聽情況,為什么不監(jiān)聽父節(jié)點?原因圖里已有描述,這里就不再贅述

三、ZK實現(xiàn)分布式鎖的編碼實現(xiàn)

1、核心工具類實現(xiàn)

通過不斷的調(diào)試,我封裝了一個ZkLockHelper類,里面封裝了上鎖和釋放鎖的方法,為了方便我將zk的一些監(jiān)聽和回調(diào)機智也融合到一起了,并沒有抽出來,下面貼上該類的全部代碼

package com.darling.service.zookeeper.lock;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.platform.commons.util.StringUtils;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;

/**
 * @description:
 * @author: dll
 * @date: Created in 2022/11/4 8:41
 * @version:
 * @modified By:
 */
@Data
@Slf4j
public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {

    private final String lockPath = "/lockItem";

    ZooKeeper zkClient;
    String threadName;

    CountDownLatch cd = new CountDownLatch(1);
    private String pathName;


    /**
     * 上鎖
     */
    public void tryLock() {
        try {
            log.info("線程:{}正在創(chuàng)建節(jié)點",threadName);
            zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");
            log.info("線程:{}正在阻塞......",threadName);
            // 由于上面是異步創(chuàng)建所以這里需要阻塞住當(dāng)前線程
            cd.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 釋放鎖
     */
    public void unLock() {
        try {
            zkClient.delete(pathName,-1);
            System.out.println(threadName + " 工作結(jié)束....");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

    /**
     * create方法的回調(diào),創(chuàng)建成功后在此處獲取/DCSLock的子目錄,比較節(jié)點ID是否最小,是則拿到鎖。。。
     * @param rc        狀態(tài)碼
     * @param path      create方法的path入?yún)?
     * @param ctx       create方法的上下文入?yún)?
     * @param name      創(chuàng)建成功的臨時有序節(jié)點的名稱,即在path的后面加上了zk維護的自增ID;
     *                  注意如果創(chuàng)建的不是有序節(jié)點,那么此處的name和path的內(nèi)容一致
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);
        if (StringUtils.isNotBlank(name)) {
            try {
                pathName =  name ;
                // 此處path需注意要寫/
                zkClient.getChildren("/", false,this,"123");
//                List<String> children = zkClient.getChildren("/", false);
//                log.info(">>>>>threadName:{},children:{}",threadName,children);
//                // 給children排序
//                Collections.sort(children);
//                int i = children.indexOf(pathName.substring(1));
//                // 判斷自身是否第一個
//                if (Objects.equals(i,0)) {
//                    // 是第一個則表示搶到了鎖
//                    log.info("線程{}搶到了鎖",threadName);
//                    cd.countDown();
//                }else {
//                    // 表示沒搶到鎖
//                    log.info("線程{}搶鎖失敗,重新注冊監(jiān)聽器",threadName);
//                    zkClient.exists("/"+children.get(i-1),this,this,"AAA");
//                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * exists方法的回調(diào),此處暫不做處理
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {

    }

    /**
     * exists的watch監(jiān)聽
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        //如果第一個線程鎖釋放了,等價于第一個線程刪除了節(jié)點,此時只有第二個線程會監(jiān)控的到
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zkClient.getChildren("/", false,this,"123");
//                // 此處path需注意要寫"/"
//                List<String> children = null;
//                try {
//                    children = zkClient.getChildren("/", false);
//                } catch (KeeperException e) {
//                    e.printStackTrace();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                log.info(">>>>>threadName:{},children:{}",threadName,children);
//                // 給children排序
//                Collections.sort(children);
//                int i = children.indexOf(pathName.substring(1));
//                // 判斷自身是否第一個
//                if (Objects.equals(i,0)) {
//                    // 是第一個則表示搶到了鎖
//                    log.info("線程{}搶到了鎖",threadName);
//                    cd.countDown();
//                }else {
//                    /**
//                     *  表示沒搶到鎖;需要判斷前置節(jié)點存不存在,其實這里并不是特別關(guān)心前置節(jié)點存不存在,所以其回調(diào)可以不處理;
//                     *  但是這里關(guān)注的前置節(jié)點的監(jiān)聽,當(dāng)前置節(jié)點監(jiān)聽到被刪除時就是其他線程搶鎖之時
//                     */
//                    zkClient.exists("/"+children.get(i-1),this,this,"AAA");
//                }
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }


    /**
     * getChildren方法的回調(diào)
     * @param rc
     * @param path
     * @param ctx
     * @param children
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        try {
            log.info(">>>>>threadName:{},children:{}", threadName, children);
            if (Objects.isNull(children)) {
                return;
            }
            // 給children排序
            Collections.sort(children);
            int i = children.indexOf(pathName.substring(1));
            // 判斷自身是否第一個
            if (Objects.equals(i, 0)) {
                // 是第一個則表示搶到了鎖
                log.info("線程{}搶到了鎖", threadName);
                cd.countDown();
            } else {
                // 表示沒搶到鎖
                log.info("線程{}搶鎖失敗,重新注冊監(jiān)聽器", threadName);
                /**
                 *  表示沒搶到鎖;需要判斷前置節(jié)點存不存在,其實這里并不是特別關(guān)心前置節(jié)點存不存在,所以其回調(diào)可以不處理;
                 *  但是這里關(guān)注的前置節(jié)點的監(jiān)聽,當(dāng)前置節(jié)點監(jiān)聽到被刪除時就是其他線程搶鎖之時
                 */
                zkClient.exists("/" + children.get(i - 1), this, this, "AAA");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

提示:代碼中注釋的代碼塊可以關(guān)注下,原本是直接阻塞式編程,將獲取所有子節(jié)點并釋放鎖的操作直接寫在getChildren方法的回調(diào)里,后來發(fā)現(xiàn)當(dāng)節(jié)點被刪除時我們還要重新?lián)屾i,那么代碼就冗余了,于是結(jié)合響應(yīng)式編程的思想,將這段核心代碼放到getChildren方法的回調(diào)里,這樣代碼簡潔了并且可以讓業(yè)務(wù)更只關(guān)注于getChildren這件事了

2、測試代碼編寫

線程安全問題復(fù)現(xiàn)

package com.darling.service.zookeeper.lock;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

/**
 * @description:  開啟是個線程給i做遞減操作,未加鎖的情況下會有線程安全問題
 * @author: dll
 * @date: Created in 2022/11/8 8:32
 * @version:
 * @modified By:
 */
@Slf4j
public class ZkLockTest02 {

    private int i = 10;

    @Test
    public void test() throws InterruptedException {

        for (int n = 0; n < 10; n++) {
            new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    Thread.sleep(100);
                    incre();
                }
            }).start();
        }
        Thread.sleep(5000);
        log.info("i = {}",i);
    }

    /**
     * i遞減 線程不安全
     */
    public void incre(){
//        i.incrementAndGet();
        log.info("當(dāng)前線程:{},i = {}",Thread.currentThread().getName(),i--);
    }
}

上面代碼運行結(jié)果如下:

使用上面封裝的ZkLockHelper實現(xiàn)的分布式鎖

package com.darling.service.zookeeper.lock;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @description: 使用zk實現(xiàn)的分布式鎖解決線程安全問題
 * @author: dll
 * @date: Created in 2022/11/8 8:32
 * @version:
 * @modified By:
 */
@Slf4j
public class ZkLockTest03 {


    ZooKeeper zkClient;

    @Before
    public void conn (){
        zkClient  = ZkUtil.getZkClient();
    }

    @After
    public void close (){
        try {
            zkClient.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private int i = 10;

    @Test
    public void test() throws InterruptedException {

        for (int n = 0; n < 10; n++) {
            new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    Thread.sleep(100);
                    ZkLockHelper zkHelper = new ZkLockHelper();
                    // 這里給zkHelper設(shè)置threadName是為了后續(xù)調(diào)試的時候日志打印,便于觀察存在的問題
                    String threadName = Thread.currentThread().getName();
                    zkHelper.setThreadName(threadName);
                    zkHelper.setZkClient(zkClient);
                    // tryLock上鎖
                    zkHelper.tryLock();
                    incre();
                    log.info("線程{}正在執(zhí)行業(yè)務(wù)代碼...",threadName);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 釋放鎖
                    zkHelper.unLock();
                }
            }).start();
        }
        while (true) {
        }
    }

    /**
     * i遞減 線程不安全
     */
    public void incre(){
//        i.incrementAndGet();
        log.info("☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆當(dāng)前線程:{},i = {}",Thread.currentThread().getName(),i--);
    }
}

運行結(jié)果如下:

由于日志中摻雜著zk的日志所有此處并未截全,但是也能看到i是在按規(guī)律遞減的,不會出現(xiàn)通過線程拿到相同值的情況

四、zk實現(xiàn)分布式鎖的優(yōu)缺點

優(yōu)點

  • 集群部署不存在單點故障問題
  • 統(tǒng)一視圖
    zk集群每個節(jié)點對外提供的數(shù)據(jù)是一致的,數(shù)據(jù)一致性有所報障
  • 臨時有序節(jié)點
    zk提供臨時有序節(jié)點,這樣當(dāng)客戶端失去連接時會自動釋放鎖,不用像其他方案一樣當(dāng)拿到鎖的實例服務(wù)不可用時,需要定時任務(wù)去刪除鎖;臨時節(jié)點的特性就是當(dāng)客戶端失去連接會自動刪除
  • watch能力加持
    當(dāng)獲取不到鎖時,無需客戶端定期輪詢爭搶,只需watch前一節(jié)點即可,當(dāng)有變化時會及時通知,比普通方案即及時又高效;注意這里最好只watch前一節(jié)點,如果watch整個父目錄的話,當(dāng)客戶端并發(fā)較大時會不斷有請求進出zk,給zk性能帶來壓力

缺點

  • 與單機版redis比較的話性能肯定較差,但是當(dāng)客戶端集群足夠龐大且業(yè)務(wù)量足夠多時肯定還是集群更加穩(wěn)定

好了,zk實現(xiàn)分布式鎖的編碼實現(xiàn)就到這了,后續(xù)有時間再寫偏redis的,其實思路縷清了,編碼實現(xiàn)還是很簡單的

到此這篇關(guān)于zookeeper實戰(zhàn)之實現(xiàn)分布式鎖的方法的文章就介紹到這了,更多相關(guān)zookeeper分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • springboot 啟動如何修改application.properties的參數(shù)

    springboot 啟動如何修改application.properties的參數(shù)

    這篇文章主要介紹了springboot 啟動如何修改application.properties的參數(shù)方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • java Arrays快速打印數(shù)組的數(shù)據(jù)元素列表案例

    java Arrays快速打印數(shù)組的數(shù)據(jù)元素列表案例

    這篇文章主要介紹了java Arrays快速打印數(shù)組的數(shù)據(jù)元素列表案例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • SpringBoot中過濾器Filter+JWT令牌實現(xiàn)登錄驗證

    SpringBoot中過濾器Filter+JWT令牌實現(xiàn)登錄驗證

    本文主要介紹了SpringBoot中過濾器Filter+JWT令牌實現(xiàn)登錄驗證,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-04-04
  • Java實現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改

    Java實現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改

    這篇文章主要介紹了Java實現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改功能的實現(xiàn)過程,感興趣的小伙伴們可以參考一下
    2016-01-01
  • 解決SpringCloud下spring-boot-maven-plugin插件的打包問題

    解決SpringCloud下spring-boot-maven-plugin插件的打包問題

    這篇文章主要介紹了SpringCloud下spring-boot-maven-plugin插件的打包問題,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-03-03
  • Java實現(xiàn)登錄與注冊頁面

    Java實現(xiàn)登錄與注冊頁面

    這篇文章主要為大家詳細介紹了Java實現(xiàn)登錄與注冊頁面,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-04-04
  • log4j2日志異步打印(實例講解)

    log4j2日志異步打印(實例講解)

    下面小編就為大家?guī)硪黄猯og4j2日志異步打印(實例講解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-10-10
  • HashMap容量和負(fù)載因子使用說明

    HashMap容量和負(fù)載因子使用說明

    這篇文章主要介紹了HashMap容量和負(fù)載因子使用說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • SpringMvc接受請求參數(shù)的幾種情況演示

    SpringMvc接受請求參數(shù)的幾種情況演示

    Springmvc接受請求參數(shù)的幾種介紹,如何接受json請求參數(shù),本文通過實例代碼給大家介紹的非常詳細,需要的朋友參考下吧
    2021-07-07
  • Java Process.waitFor()方法詳解

    Java Process.waitFor()方法詳解

    這篇文章主要介紹了Java Process.waitFor()方法詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12

最新評論