欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中的BlockingQueue阻塞隊(duì)列原理以及實(shí)現(xiàn)詳解

 更新時(shí)間:2023年12月22日 08:36:25   作者:huisheng_qaq  
這篇文章主要介紹了Java中的BlockingQueue阻塞隊(duì)列原理以及實(shí)現(xiàn)詳解,在最常見(jiàn)的使用到這個(gè)阻塞隊(duì)列的地方,就是我們耳熟能詳?shù)木€程池里面了,作為我們線程池的一大最大參與者,也是AQS的一個(gè)具體實(shí)現(xiàn),需要的朋友可以參考下

一,BlockingQueue

在最常見(jiàn)的使用到這個(gè)阻塞隊(duì)列的地方,就是我們耳熟能詳?shù)木€程池里面了,作為我們線程池的一大最大參與者,也是AQS的一個(gè)具體實(shí)現(xiàn),因此可以好好的深入了解一下這個(gè)BlockingQueue阻塞隊(duì)列。

用一句話描述這個(gè)阻塞隊(duì)列就是:它是線程的一個(gè)通信工具,在任意時(shí)刻,不管并發(fā)有多高,在單jvm進(jìn)程上,同一時(shí)間永遠(yuǎn)只有一個(gè)線程能夠?qū)﹃?duì)列進(jìn)行入隊(duì)和出隊(duì)的操作,它的特性是在任意時(shí)刻只有一個(gè)線程可以進(jìn)行take或者put操作。因此這個(gè)隊(duì)列是一個(gè)線程安全的隊(duì)列。

比較適用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,因此適用的應(yīng)用場(chǎng)景如下 線程池,springCloud-Eureka的三級(jí)緩存,Nacos,Netty,RakectMq等

所有的阻塞隊(duì)列都都實(shí)現(xiàn)了對(duì)這個(gè)BlockingQueue接口

public interface BlockingQueue<E> extends Queue<E>

1,主要常用的隊(duì)列有如下

ArrayBlockingQueue: 由數(shù)組支持的有界隊(duì)列

LinkedBlockingQueue: 由鏈接節(jié)點(diǎn)支持的可選有界隊(duì)列

PriorityBlockingQueue: 由優(yōu)先級(jí)堆支持的無(wú)界優(yōu)先級(jí)隊(duì)列

DelayQueue: 由優(yōu)先級(jí)堆支持的、基于時(shí)間的調(diào)度隊(duì)列

2,基本工作原理實(shí)現(xiàn)如下

1,以一個(gè)有界隊(duì)列為例,首先消費(fèi)者這邊獲取到鎖,然后會(huì)生產(chǎn)商品,然后會(huì)往隊(duì)列中填滿數(shù)據(jù),隊(duì)列填滿之后,生產(chǎn)者端會(huì)進(jìn)行阻塞,同時(shí)會(huì)釋放這把鎖,并且會(huì)通知這個(gè)消費(fèi)者趕緊去消費(fèi)。當(dāng)然內(nèi)部也做了很多事情,不一定就是說(shuō)一定要阻塞隊(duì)列滿了之后才會(huì)去喚醒生產(chǎn)者去消費(fèi),而是消費(fèi)者那邊也會(huì)有一個(gè)監(jiān)聽(tīng)事件,只有隊(duì)列不為空,就會(huì)有這個(gè)消費(fèi)者來(lái)消費(fèi)。

[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-PwPGs9DF-1657455705341)(C:\Users\HULOUBO\AppData\Roaming\Typora\typora-user-images\1657445347685.png)]

2,消費(fèi)者在接收到生產(chǎn)者的通知之后呢,就會(huì)先去獲取到這把鎖,然后對(duì)里面的產(chǎn)品進(jìn)行消費(fèi),當(dāng)隊(duì)列里面的產(chǎn)品都被消費(fèi)完成之后,消費(fèi)者這邊又會(huì)釋放這把鎖,然后將自身阻塞,并同時(shí)去喚醒這個(gè)生產(chǎn)者繼續(xù)生產(chǎn)產(chǎn)品。

在這里插入圖片描述

3,生產(chǎn)者又獲取到鎖,然后重復(fù)執(zhí)行第一步。

3,基本api使用如下

在這里插入圖片描述

在這里插入圖片描述

二,源碼剖析

在了解過(guò)一定的工作原理之后,接下來(lái)可以對(duì)源碼分析一波。

2.1,ArrayBlockingQueue

這里主要通過(guò)這個(gè)ArrayBlockingQueue為例,來(lái)描述一下這個(gè)阻塞隊(duì)列的工作流程

BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);

這個(gè)構(gòu)造方法里面有如下參數(shù)

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); //非公平鎖
    notEmpty = lock.newCondition(); //條件對(duì)象,用于喚醒指定線程
    notFull =  lock.newCondition(); //條件對(duì)象
}

生產(chǎn)者會(huì)向隊(duì)列中put產(chǎn)品,生產(chǎn)者后會(huì)持有鎖,此時(shí)會(huì)向隊(duì)列中存放產(chǎn)品,如果隊(duì)列滿了,則會(huì)阻塞自己,并且在最后會(huì)釋放鎖。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock; //生產(chǎn)者加鎖
    lock.lockInterruptibly();
    try {
        while (count == items.length) //如果隊(duì)列滿了,則會(huì)阻塞
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock(); //釋放鎖
    }
}

既然涉及到ReentrantLock,那么就用從之前的AQS里面講起了,這里面這要是一個(gè)CLH同步等待隊(duì)列,由一個(gè)雙向鏈表和一個(gè)同步阻塞器組成,同步阻塞器會(huì)有一個(gè)state和一個(gè)exclusiveOwnerThread狀態(tài)組成,state=0表示當(dāng)前沒(méi)有對(duì)象獲取到鎖,可以來(lái)競(jìng)爭(zhēng)鎖。每個(gè)結(jié)點(diǎn)由一個(gè)前驅(qū)指針和一個(gè)后繼指針,并且里面有一個(gè)waitStatus等待狀態(tài),該狀態(tài)主要表示下一個(gè)結(jié)點(diǎn)的存活狀態(tài)。

在這里插入圖片描述

這里的話不會(huì)像之前一樣使用這個(gè)CLH同步等待隊(duì)列,而是加入了一種新的Condition條件等待隊(duì)列,如下圖。由firstWaiter和nextWaiter組成的單向鏈表隊(duì)列,里面的waitStatus為CONDITION:-2 。也就是說(shuō)如果當(dāng)前生產(chǎn)者結(jié)點(diǎn)后面的結(jié)點(diǎn)又是一個(gè)生產(chǎn)者節(jié)點(diǎn),因?yàn)槠陂g可能存在多個(gè)生產(chǎn)者的線程,而為了喚醒接下來(lái)的消費(fèi)者,就會(huì)創(chuàng)建一個(gè)條件等待隊(duì)列,去存儲(chǔ)后面的生產(chǎn)者結(jié)點(diǎn)。 就是說(shuō)在CLH同步等待隊(duì)列中,當(dāng)前結(jié)點(diǎn)為生產(chǎn)者的話,在阻塞隊(duì)列滿了之后,如果CLH中的下一個(gè)節(jié)點(diǎn)還是生產(chǎn)者,則會(huì)將waitStatus的狀態(tài)設(shè)置成-2,并將下一個(gè)節(jié)點(diǎn)移動(dòng)到這個(gè)條件等待隊(duì)列里面并進(jìn)行排隊(duì),如果下一個(gè)結(jié)點(diǎn)還是,又會(huì)將下一個(gè)結(jié)點(diǎn)移動(dòng)到這個(gè)條件等待隊(duì)列里面并進(jìn)行排隊(duì)。知道下一個(gè)結(jié)點(diǎn)是消費(fèi)者為止。

在這里插入圖片描述

await()釋放鎖的流程如下

public final void await() throws InterruptedException {
    //線程是否被中斷,如果被中斷,直接拋異常
    if (Thread.interrupted())
        throw new InterruptedException();
    //條件等待隊(duì)列,會(huì)構(gòu)建一個(gè)新的隊(duì)列
    Node node = addConditionWaiter();
    //釋放鎖,并對(duì)對(duì)應(yīng)的結(jié)點(diǎn)進(jìn)行喚醒操作
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判斷當(dāng)前結(jié)點(diǎn)是在條件隊(duì)列里面還是在同步隊(duì)列里面
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

構(gòu)建條件等待隊(duì)列如下

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

出隊(duì),消費(fèi)者在獲取產(chǎn)品時(shí),產(chǎn)品就會(huì)出隊(duì),與此同時(shí),在隊(duì)列出隊(duì)成功之后,隊(duì)列中就會(huì)有一個(gè)空位,會(huì)調(diào)用notFull.signal()方法,通知生產(chǎn)者可以去生產(chǎn)產(chǎn)品了。并將這個(gè)條件等待隊(duì)列放回這個(gè)CLH隊(duì)列里面,只有在CLH隊(duì)列里面才會(huì)獲取鎖。最后在CLH中才能進(jìn)行unPark釋放鎖的操作。

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //隊(duì)列中有空位,通知生產(chǎn)者生產(chǎn)產(chǎn)品
    notFull.signal();
    return x;
}

消費(fèi)者獲取產(chǎn)品

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

三,總結(jié)

BlockingQueue也是基于這個(gè)AQS的方式實(shí)現(xiàn)的,主要是利用這個(gè)生產(chǎn)者和消費(fèi)者這個(gè)模型來(lái)實(shí)現(xiàn)。

通過(guò)這個(gè)AQS中的CLH同步隊(duì)列來(lái)對(duì)節(jié)點(diǎn)的鎖的阻塞和釋放,期間利用了這個(gè)條件等待隊(duì)列來(lái)實(shí)現(xiàn),如果存在多個(gè)生產(chǎn)者的線程的情況下,就會(huì)將這些線程加入到一個(gè)條件等待的隊(duì)列里面。

并將這個(gè)節(jié)點(diǎn)的狀態(tài)改為-2,condition狀態(tài)。

在全部進(jìn)入條件等待隊(duì)列之后,這個(gè)鎖還在并沒(méi)有釋放,因此最后又需要將這個(gè)條件等待隊(duì)列里面的結(jié)點(diǎn)加回到CLH同步隊(duì)列中,再進(jìn)行排隊(duì)的釋放這個(gè)鎖。結(jié)點(diǎn)出隊(duì)的時(shí)候,然后生產(chǎn)者會(huì)通過(guò)一個(gè)singal監(jiān)聽(tīng)這個(gè)消費(fèi)者,每當(dāng)這個(gè)阻塞隊(duì)列里面出隊(duì),有一個(gè)位置的的時(shí)候,生產(chǎn)者就會(huì)生產(chǎn)這個(gè)產(chǎn)品。

消費(fèi)者也會(huì)監(jiān)聽(tīng)這個(gè)隊(duì)列,隊(duì)列中只要不為空,就回去消費(fèi)隊(duì)列中的產(chǎn)品。

獲取鎖的條件 只有在CLH隊(duì)列里等待的Node結(jié)點(diǎn)并且前驅(qū)結(jié)點(diǎn)的 waitStatus 為sinal = -1的可被喚醒的結(jié)點(diǎn)。

條件隊(duì)列里面的這些節(jié)點(diǎn)是不能獲取到鎖的。

到此這篇關(guān)于Java中的BlockingQueue阻塞隊(duì)列原理以及實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)BlockingQueue阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring重試支持Spring Retry的方法

    Spring重試支持Spring Retry的方法

    本篇文章主要介紹了Spring重試支持Spring Retry的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-04-04
  • 最長(zhǎng)公共子序列問(wèn)題的深度分析與Java實(shí)現(xiàn)方式

    最長(zhǎng)公共子序列問(wèn)題的深度分析與Java實(shí)現(xiàn)方式

    本文詳細(xì)介紹了最長(zhǎng)公共子序列(LCS)問(wèn)題,包括其概念、暴力解法、動(dòng)態(tài)規(guī)劃解法,并提供了Java代碼實(shí)現(xiàn),暴力解法雖然簡(jiǎn)單,但在大數(shù)據(jù)處理中效率較低,動(dòng)態(tài)規(guī)劃解法通過(guò)構(gòu)建DP表,顯著提高了計(jì)算效率,適用于大規(guī)模數(shù)據(jù)處理
    2025-02-02
  • Java通過(guò)值查找對(duì)應(yīng)的枚舉的實(shí)現(xiàn)

    Java通過(guò)值查找對(duì)應(yīng)的枚舉的實(shí)現(xiàn)

    本文主要介紹了Java通過(guò)值查找對(duì)應(yīng)的枚舉的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-02-02
  • Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(25)

    Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(25)

    下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你
    2021-07-07
  • java swing編程入門代碼編寫(xiě)(java編程入門)

    java swing編程入門代碼編寫(xiě)(java編程入門)

    Swing是一個(gè)為Java設(shè)計(jì)的GUI工具包,是用來(lái)做UI界面的,大家看了下面的介紹就要吧自己做java界面了
    2013-12-12
  • Springboot+MDC+traceId日志中打印唯一traceId

    Springboot+MDC+traceId日志中打印唯一traceId

    本文主要介紹了Springboot+MDC+traceId日志中打印唯一traceId,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-10-10
  • FeignClientFactoryBean創(chuàng)建動(dòng)態(tài)代理詳細(xì)解讀

    FeignClientFactoryBean創(chuàng)建動(dòng)態(tài)代理詳細(xì)解讀

    這篇文章主要介紹了FeignClientFactoryBean創(chuàng)建動(dòng)態(tài)代理詳細(xì)解讀,當(dāng)直接進(jìn)去注冊(cè)的方法中,一步步放下走,都是直接放bean的定義信息中放入值,然后轉(zhuǎn)成BeanDefinitionHolder,最后在注冊(cè)到IOC容器中,需要的朋友可以參考下
    2023-11-11
  • jvm字符串常量池在什么內(nèi)存區(qū)域問(wèn)題解析

    jvm字符串常量池在什么內(nèi)存區(qū)域問(wèn)題解析

    這篇文章主要介紹了jvm字符串常量池在什么內(nèi)存區(qū)域的問(wèn)題解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-11-11
  • springmvc的validator數(shù)據(jù)校驗(yàn)的實(shí)現(xiàn)示例代碼

    springmvc的validator數(shù)據(jù)校驗(yàn)的實(shí)現(xiàn)示例代碼

    這篇文章主要介紹了springmvc的數(shù)據(jù)校驗(yàn)的實(shí)現(xiàn)示例代碼, 具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-07-07
  • spring boot 項(xiàng)目中使用thymeleaf模板的案例分析

    spring boot 項(xiàng)目中使用thymeleaf模板的案例分析

    這篇文章主要介紹了spring boot 項(xiàng)目中使用thymeleaf模板的案例分析,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09

最新評(píng)論