欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

LinkedBlockingQueue鏈?zhǔn)阶枞?duì)列的使用和原理解析

 更新時(shí)間:2022年10月28日 10:32:14   作者:澇山道士  
這篇文章主要介紹了LinkedBlockingQueue鏈?zhǔn)阶枞?duì)列的使用和原理解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

概覽

1. 基于鏈表的可選有界阻塞隊(duì)列。根據(jù)FIFO的出入隊(duì)順序,從隊(duì)列頭部檢索和獲取元素,在隊(duì)列尾部插入新元素。

2. 當(dāng)作為有界阻塞隊(duì)列,在隊(duì)列空間不足時(shí),put方法將會(huì)一直阻塞直到有多余空間才會(huì)執(zhí)行插入元素操作,take方法則相反,只到隊(duì)列內(nèi)元素不為空時(shí),才將隊(duì)列元素逐個(gè)取出。

3. 隊(duì)列容量不指定時(shí),默認(rèn)為Integer.MAX_VALUE,此時(shí)可以看作無(wú)界隊(duì)列。

4. 使用非公平鎖進(jìn)行并發(fā)控制。所有方法都是線程安全的。

使用方法

下面的文章給出了阻塞隊(duì)列的四種基本用法:

了解BlockingQueue 大體框架和實(shí)現(xiàn)思路

LinkedBlockingQueue實(shí)現(xiàn)了BlockingQueue類。

在BlockingQueue中,方法被分為如下四類:

  • Throws exception:操作未實(shí)現(xiàn)時(shí)(正常流程下的執(zhí)行)拋出異常
  • Special value:根據(jù)操作的實(shí)際情況,返回特定值,例如null、false(這些失敗可能是線程中斷、隊(duì)列為空引起的)
  • Blocks:阻塞當(dāng)前線程,直到當(dāng)前線程可以成功執(zhí)行
  • Times out:嘗試指定時(shí)間后,放棄執(zhí)行
 

Throws exception

Special value

Blocks

Times out

新增

add(E e)

offer(E e)

put(E e)

offer(E e, long timeout, TimeUnit unit)

刪除

remove()

poll()

take()

poll(long timeout, TimeUnit unit)

查詢

element()

peek()

  

1. add | remove | element

這三個(gè)方法在BlockingQueue的定義中,都會(huì)在操作未實(shí)現(xiàn)時(shí),拋出異常。

  • add(E e)在隊(duì)尾添加元素e,add內(nèi)部調(diào)用offer方法實(shí)現(xiàn)。因此,元素e為空時(shí),拋出NullPointerException異常;插入失敗時(shí),拋出IllegalStateException異常。
  • remove刪除隊(duì)首元素,內(nèi)部調(diào)用poll方法。隊(duì)首無(wú)數(shù)據(jù)時(shí),拋出NoSuchElementException異常。
  • element檢索隊(duì)首元素。隊(duì)首無(wú)數(shù)據(jù)時(shí),拋出NoSuchElementException異常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.add(1);
blockingQue.remove(1);
blockingQue.remove(); // NoSuchElementException
blockingQue.element(); // NoSuchElementException

2. offer | poll | peek

根據(jù)操作的實(shí)際情況,返回特定值,例如null、false(這些失敗可能是線程中斷、隊(duì)列為空引起的)

  • offer(E e)在隊(duì)尾添加元素e,元素e為空時(shí),拋出NullPointerException異常;插入失敗時(shí)返回false。
  • poll刪除隊(duì)首元素。刪除失敗時(shí)返回false。
  • peek檢索隊(duì)首元素。隊(duì)首無(wú)數(shù)據(jù)時(shí),返回null。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.offer(1);
blockingQue.poll();
Integer peek = blockingQue.peek(); // 返回null

3. put | take

阻塞當(dāng)前線程,直到當(dāng)前線程可以成功執(zhí)行。

  • put(E e)在隊(duì)尾添加元素e,元素e為空時(shí),拋出NullPointerException異常。當(dāng)隊(duì)列滿時(shí),阻塞put線程,等待隊(duì)列被消費(fèi)后,隊(duì)列容量不滿時(shí),該阻塞線程繼續(xù)嘗試在隊(duì)尾插入元素。該方法在阻塞時(shí)可以被中斷,并拋出InterruptedException異常。
  • take刪除并獲取隊(duì)首元素。隊(duì)首元素不為空時(shí)返回。隊(duì)首元素為空,阻塞take線程,等待隊(duì)列不為空時(shí),再次嘗試消費(fèi)隊(duì)首元素。該方法在阻塞時(shí)可以被中斷,并拋出InterruptedException異常。

注意:阻塞時(shí),不會(huì)解除鎖占用。

LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
    blockingQue.put(1);
    blockingQue.take();
} catch (InterruptedException e) {
    // 線程被中斷
    e.printStackTrace();
}

4. offer | poll (timeout)

嘗試指定時(shí)間后,放棄執(zhí)行

  • offer(E e, long timeout, TimeUnit unit)在隊(duì)尾添加元素e,元素e為空時(shí),拋出NullPointerException異常;當(dāng)隊(duì)列容量滿時(shí),線程休眠一定時(shí)間后再次查看隊(duì)列容量,當(dāng)該休眠時(shí)間大于等于timeout后,此時(shí)隊(duì)列還滿則返回false。不滿時(shí),嘗試入隊(duì)。需要注意的是,由于偽喚醒機(jī)制的存在,線程可能在timeout這個(gè)時(shí)間段內(nèi)的任意一點(diǎn)被喚醒,如果隊(duì)列容易不滿,則會(huì)直接執(zhí)行入隊(duì)操作。阻塞時(shí),當(dāng)前線程被中斷拋出InterruptedException異常。
  • poll(long timeout, TimeUnit unit)刪除隊(duì)首元素。poll與offer對(duì)應(yīng)的,當(dāng)隊(duì)列為空的時(shí)候,線程休眠一定時(shí)間。休眠時(shí),當(dāng)前線程被中斷拋出InterruptedException異常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
    blockingQue.offer(1, 100, TimeUnit.MILLISECONDS);
    blockingQue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

當(dāng)然除了上述的阻塞隊(duì)列的基本操作外,LinkedBlockingQueue還具有集合Collection的性質(zhì)。因此集合中的通用方法也可以使用。

源碼解析

說(shuō)明

本次源碼分析主要按照下面幾個(gè)步驟進(jìn)行:

1. 保存隊(duì)列數(shù)據(jù)的容器以及出入隊(duì)方法

2. 主要成員變量以及作用

3. 主要方法分析

隊(duì)列容器

結(jié)構(gòu)圖

僅有數(shù)據(jù)item和后繼next的單向節(jié)點(diǎn),結(jié)構(gòu)簡(jiǎn)單。

static class Node<E> {
    E item;
 
    Node<E> next;
 
    Node(E x) { item = x; }
}

next三種情況

A 普通節(jié)點(diǎn)的真實(shí)后繼

B 真正的隊(duì)首節(jié)點(diǎn),item=null(隊(duì)首節(jié)點(diǎn)恒為head.next)

C 隊(duì)尾節(jié)點(diǎn),next=null

  • item: null -> first -> …… -> last
  • next: first -> second -> ……-> null

入隊(duì)操作

    // 入隊(duì)
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node; // last.next = node; last = node;
    }

步驟1

 步驟2

出隊(duì)操作

 
    // 出隊(duì)
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        // 形成引用鏈閉環(huán),JVM根據(jù)可達(dá)性分析時(shí),GC root的引用鏈與該對(duì)象之間不可達(dá),進(jìn)行GC
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

步驟1

 步驟2

關(guān)鍵成員變量

隊(duì)列入隊(duì)和出隊(duì)鎖分離,都使用了非公平鎖。

這里的count屬性需要注意下,這里使用了原子類保證操作的原子性。后面的入隊(duì)和出隊(duì),將會(huì)頻繁使用它。

/** 容量, 初始化不設(shè)置時(shí)默認(rèn)為Integer.MAX_VALUE*/
private final int capacity;
 
/** 當(dāng)前隊(duì)列內(nèi)的元素?cái)?shù)量 */
private final AtomicInteger count = new AtomicInteger();
 
/**
 * 隊(duì)首
 * 不變量: head.item == null
 */
transient Node<E> head;
 
/**
 * 隊(duì)尾
 * 不變量: last.next == null
 */
private transient Node<E> last;
 
/** 出隊(duì)操作公用鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
 
/** 用于出隊(duì)操作的阻塞和喚醒 出隊(duì)的話,只需要考慮隊(duì)列是否為空 */
private final Condition notEmpty = takeLock.newCondition(); 
 
/** 入隊(duì)操作公用鎖 */
private final ReentrantLock putLock = new ReentrantLock();
 
/** 用于入隊(duì)操作的阻塞和喚醒 入隊(duì)只需要考慮隊(duì)列空間是否足夠*/
private final Condition notFull = putLock.newCondition();

初始化

三個(gè)構(gòu)造函數(shù)

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
 
    // 自定義容量
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
 
    // 初始化時(shí),批量添加集合中的元素
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

put方法

put方法幾個(gè)關(guān)注點(diǎn)

  • 釋放鎖的時(shí)機(jī)
  • 執(zhí)行入隊(duì)操作
  • 喚醒生產(chǎn)者的時(shí)機(jī)
  • 喚醒消費(fèi)者的時(shí)機(jī)

這幾點(diǎn)是整個(gè)阻塞操作的核心,可以在下面的分析中仔細(xì)觀察。

注:由于阻塞隊(duì)列就是基于生產(chǎn)者-消費(fèi)者模型的,因此,下文中都把調(diào)用put方法的線程稱為生產(chǎn)者,調(diào)用take方法的線程稱為消費(fèi)者。

總體分析

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1; // -1代表操作異常
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 如果線程沒(méi)有被標(biāo)記為中斷,則獲取鎖
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // 這里是線程在執(zhí)行put操作時(shí)唯一一個(gè)執(zhí)行過(guò)程中釋放鎖的地方
            notFull.await(); // 容量已滿,等待被消費(fèi)后喚醒
        }
        // 添加元素,更新容量
        enqueue(node);
        c = count.getAndIncrement();
        // 隊(duì)列容量有余時(shí),在這里再次喚醒一個(gè)其他的生產(chǎn)者線程(或者說(shuō)消費(fèi)者消費(fèi)速度大于生產(chǎn))
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 釋放鎖
        putLock.unlock();
    }
    // 喚醒一個(gè)消費(fèi)者
    if (c == 0)
        signalNotEmpty();
}

count屬性并發(fā)問(wèn)題

這里需要重點(diǎn)關(guān)注count,由于有兩把鎖,count可以同時(shí)被putLock、takeLock操作,那么這里是否會(huì)產(chǎn)生并發(fā)問(wèn)題。

分析如下:

A. 只有putLock或takeLock一把鎖操作:就是單線程操作,沒(méi)影響,不產(chǎn)生并發(fā)問(wèn)題。

其他所有put操作都處于await的狀態(tài)或者競(jìng)爭(zhēng)鎖狀態(tài),其他線程也因?yàn)楂@取不到鎖而無(wú)法執(zhí)行,只有等該節(jié)點(diǎn)添加完成釋放鎖,其他線程才有機(jī)會(huì)繼續(xù)執(zhí)行。

        while (count.get() == capacity) {
            notFull.await(); // 容量已滿,等待被消費(fèi)后喚醒
        }

B. putLock和takeLock同時(shí)操作:我們假設(shè)兩個(gè)線程一個(gè)獲取到putLock,一個(gè)獲取到了takeLock(同時(shí)最多也只有兩個(gè)線程操作count)。

// put            
while (count.get() == capacity) {
     notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
 
 
 
// take
while (count.get() == 0) {
    notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
    notEmpty.signal();

由于count是原子類那么count的所有讀寫操作必然是一個(gè)串聯(lián)的操作,而非并行操作,因此也不存在并發(fā)問(wèn)題,如下圖(順序可能不同):

喚醒消費(fèi)者

代碼的最后一段,會(huì)有喚醒一個(gè)消費(fèi)者的操作。

// 喚醒一個(gè)等待中的消費(fèi)者
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

剛開(kāi)始看到的時(shí)候很疑惑,為什么是c == 0才喚醒。如果生產(chǎn)者入隊(duì)成功,那么c應(yīng)該為如下值:

c = count.getAndIncrement();

后面看了一下count.getAndIncrement()方法定義才發(fā)現(xiàn)自己記混了,count.getAndIncrement()是一個(gè)原子操作,且返回值的是操作前的值。

ok,現(xiàn)在沒(méi)問(wèn)題了。

count >= 0,也就是說(shuō),只有在生產(chǎn)者入隊(duì)前隊(duì)列為空,入隊(duì)成功之后才會(huì)喚醒一個(gè)消費(fèi)者消費(fèi)。

take方法

take方法與put方法大致相似,只是與put做相反操作。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 隊(duì)列元素為空,停止消費(fèi),讓出鎖并等待被喚醒
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 移除隊(duì)首元素,并更新容量
        x = dequeue();
        c = count.getAndDecrement();
        // 生產(chǎn)速度大于消費(fèi)速度,喚醒一個(gè)其他消費(fèi)者進(jìn)行消費(fèi)
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 消費(fèi)之前隊(duì)列已滿,消費(fèi)后喚醒一個(gè)生產(chǎn)者
    if (c == capacity)
        signalNotFull();
    return x;
}
 
// 喚醒生產(chǎn)者
/**
 * Signals a waiting put. Called only from take/poll.
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

總結(jié)

總體上看LinkedBlockingQueue類不難,整個(gè)生產(chǎn)-消費(fèi)的流程實(shí)現(xiàn)也比較簡(jiǎn)單。源碼已經(jīng)把該介紹的東西都講得很明白了,我這屬于依葫蘆畫(huà)瓢順著源碼注釋寫出來(lái)的。這么一寫,自己這個(gè)類的印象就很深刻了。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • java定時(shí)任務(wù)cron表達(dá)式每周執(zhí)行一次的坑及解決

    java定時(shí)任務(wù)cron表達(dá)式每周執(zhí)行一次的坑及解決

    這篇文章主要介紹了java定時(shí)任務(wù)cron表達(dá)式每周執(zhí)行一次的坑及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • Mybatis攔截器打印sql問(wèn)題

    Mybatis攔截器打印sql問(wèn)題

    這篇文章主要介紹了Mybatis攔截器打印sql問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • java基本教程之join方法詳解 java多線程教程

    java基本教程之join方法詳解 java多線程教程

    本文對(duì)java Thread中join()方法進(jìn)行介紹,join()的作用是讓“主線程”等待“子線程”結(jié)束之后才能繼續(xù)運(yùn)行,大家參考使用吧
    2014-01-01
  • Java Resource路徑整理總結(jié)

    Java Resource路徑整理總結(jié)

    這篇文章主要介紹了 Java Resource路徑整理總結(jié)的相關(guān)資料,需要的朋友可以參考下
    2017-03-03
  • Java實(shí)現(xiàn)萬(wàn)年歷效果

    Java實(shí)現(xiàn)萬(wàn)年歷效果

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)萬(wàn)年歷效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • Java數(shù)據(jù)結(jié)構(gòu)之圖的兩種搜索算法詳解

    Java數(shù)據(jù)結(jié)構(gòu)之圖的兩種搜索算法詳解

    在很多情況下,我們需要遍歷圖,得到圖的一些性質(zhì)。有關(guān)圖的搜索,最經(jīng)典的算法有深度優(yōu)先搜索和廣度優(yōu)先搜索,接下來(lái)我們分別講解這兩種搜索算法,需要的可以參考一下
    2022-11-11
  • SpringBoot 如何使用RestTemplate來(lái)調(diào)用接口

    SpringBoot 如何使用RestTemplate來(lái)調(diào)用接口

    這篇文章主要介紹了SpringBoot 如何使用RestTemplate來(lái)調(diào)用接口方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Kotlin 基本語(yǔ)法實(shí)例詳解

    Kotlin 基本語(yǔ)法實(shí)例詳解

    這篇文章主要介紹了Kotlin 基本語(yǔ)法實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下
    2017-06-06
  • java按豎線分割的實(shí)現(xiàn)

    java按豎線分割的實(shí)現(xiàn)

    在Java中,我們可以使用split()方法按豎線分割字符串,本文將介紹如何使用Java中的字符串分割方法來(lái)按豎線進(jìn)行分割,同時(shí)提供代碼示例來(lái)幫助讀者理解,感興趣的可以了解一下
    2024-01-01
  • Java創(chuàng)建可執(zhí)行JAR文件的多種方式

    Java創(chuàng)建可執(zhí)行JAR文件的多種方式

    本文主要介紹了Java創(chuàng)建可執(zhí)行JAR文件的多種方式,使用JDK的jar工具、IDE、Maven和Gradle來(lái)創(chuàng)建和配置可執(zhí)行JAR文件,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-07-07

最新評(píng)論