JUC并發(fā)編程LinkedBlockingQueue隊(duì)列深入分析源碼
LinkedBlockingQueue介紹
在JUC包下關(guān)于線程安全的隊(duì)列實(shí)現(xiàn)有很多,那么此篇文章講解LinkedBlockingQueue的實(shí)現(xiàn)原理,相信各位讀者在線程池中能看到LinkedBlockingQueue或者SynchronousQueue隊(duì)列來作為儲存任務(wù)和消費(fèi)任務(wù)的通道。一個并發(fā)安全的隊(duì)列,在多線程中充當(dāng)著安全的傳輸任務(wù)的責(zé)任。
既然是介紹LinkedBlockingQueue,那么從構(gòu)造方法入手最合適不過。
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 初始化一個偽節(jié)點(diǎn),讓head和last都指向這個偽節(jié)點(diǎn) // 為什么需要偽節(jié)點(diǎn)的存在? // 因?yàn)榭梢员WC不會發(fā)生極端情況(假設(shè)沒有偽節(jié)點(diǎn),并且只存在一個節(jié)點(diǎn)的情況下,生產(chǎn)者和消費(fèi)者并發(fā)執(zhí)行就可能出現(xiàn)極端情況) last = head = new Node<E>(null); }
為什么需要存在偽節(jié)點(diǎn),因?yàn)榭梢员WC不會發(fā)生極端情況(假設(shè)沒有偽節(jié)點(diǎn),并且只存在一個節(jié)點(diǎn)的情況下,生產(chǎn)者和消費(fèi)者并發(fā)執(zhí)行就可能出現(xiàn)極端情況,用偽節(jié)點(diǎn)就能很好的解決這個極端問題)
/** * 因?yàn)槭顷?duì)列,用鏈表實(shí)現(xiàn),所以頭尾指針肯定不可少。 * */ transient Node<E> head; private transient Node<E> last; /** * 我們可以很清楚的看到,這里使用了2套ReentrantLock和對應(yīng)的condition條件等待隊(duì)列。 * 目的也很明顯,讓生產(chǎn)者和消費(fèi)者并行。 * */ private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
2套ReentrantLock和對應(yīng)的condition條件等待隊(duì)列,很明顯目的是為了讓生產(chǎn)者和消費(fèi)者并行,所以就需要一個偽節(jié)點(diǎn)處理極端并發(fā)情況。
為了,一些沒有接觸過隊(duì)列的讀者,所以這里還是介紹一下API把
API | 用途 | 注意事項(xiàng) |
offer | 生產(chǎn)者 | 不會阻塞,如果插入失敗,或者隊(duì)列已經(jīng)滿了,直接返回 |
poll | 消費(fèi)者 | 不會阻塞,如果消費(fèi)失敗,或者隊(duì)列當(dāng)前為空,直接返回 |
put | 生產(chǎn)者 | 會阻塞,如果插入失敗或者隊(duì)列已經(jīng)滿了,阻塞直到插入成功 |
take | 消費(fèi)者 | 會阻塞,如果消費(fèi)失敗或者當(dāng)前隊(duì)列為空,阻塞直到消費(fèi)成功 |
put方法-生產(chǎn)者
public void put(E e) throws InterruptedException { // 不能插入null if (e == null) throw new NullPointerException(); int c = -1; // 創(chuàng)建插入的節(jié)點(diǎn)。 Node<E> node = new Node<E>(e); // 拿到生產(chǎn)者的鎖對象 final ReentrantLock putLock = this.putLock; // 拿到全局計數(shù)器,注意這里用的是AtomicInteger,所以自增的原子性已經(jīng)保證。 final AtomicInteger count = this.count; // 上的是可響應(yīng)中斷鎖。 putLock.lockInterruptibly(); try { // 如果當(dāng)前隊(duì)列已經(jīng)滿了,此時我們就要去阻塞,等待隊(duì)列被消費(fèi),我們要被喚醒,醒來生產(chǎn)節(jié)點(diǎn)。 while (count.get() == capacity) { // 進(jìn)入條件等待隊(duì)列阻塞。 // 注意,只要阻塞,是會釋放鎖的,其他生產(chǎn)者線程可以搶到鎖。 notFull.await(); } // 插入到隊(duì)列尾部 enqueue(node); // 因?yàn)椴迦肓斯?jié)點(diǎn),所以全局計數(shù)需要+1 // 但是這里請注意細(xì)節(jié),getAndIncrement方法返回的是舊值。 c = count.getAndIncrement(); // 這里是一個很sao的點(diǎn) // 注意,這里只要當(dāng)前隊(duì)列沒滿,喚醒的是生產(chǎn)者的條件等待隊(duì)列。 // 為什么要這么做? // 很簡單,首先需要考慮,生產(chǎn)者和消費(fèi)者是并發(fā)執(zhí)行了。 // 其次,只要隊(duì)列沒滿就能一直生產(chǎn),那么隊(duì)列一旦滿了后,后來的線程就都去條件隊(duì)列阻塞,所以線程生產(chǎn)完一個節(jié)點(diǎn)就有必要去喚醒等待的同胞(不管有沒有同胞在阻塞,這是義務(wù)) if (c + 1 < capacity) // 喚醒條件等待隊(duì)列中頭部節(jié)點(diǎn)。 notFull.signal(); } finally { putLock.unlock(); } // 這里也是一個很sao的點(diǎn) // 再次強(qiáng)調(diào),getAndIncrement方法是返回的舊值 // 所以當(dāng)前生產(chǎn)者如果生產(chǎn)的是第一個節(jié)點(diǎn),那么c ==0 // 而隊(duì)列中沒有節(jié)點(diǎn),消費(fèi)者是要阻塞的 // 也即,這里給隊(duì)列生產(chǎn)了一個節(jié)點(diǎn),要喚醒消費(fèi)者去消費(fèi)節(jié)點(diǎn)。 if (c == 0) signalNotEmpty(); } // 插入到隊(duì)列尾部 // 因?yàn)镽eentrantLock保證了整體的原子性,所以這里細(xì)節(jié)部分不需要保證原子性了。 private void enqueue(Node<E> node) { // 插入到尾部 last = last.next = node; }
第一次看到這個代碼難免會發(fā)生震撼,為什么在生產(chǎn)者代碼里面喚醒生產(chǎn)者?不是正常寫的生產(chǎn)者消費(fèi)者模型,不都是生產(chǎn)者生產(chǎn)一個喚醒消費(fèi)者消費(fèi)嗎?怎么這里不一樣??????
因?yàn)檫@里生產(chǎn)者和消費(fèi)者并行處理,當(dāng)隊(duì)列滿了以后,后來的生產(chǎn)者線程都會去阻塞,所以生產(chǎn)者線程生產(chǎn)完一個節(jié)點(diǎn)就有必要去喚醒等待的同胞(不管有沒有同胞在阻塞,這是義務(wù))
大致流程如下:
- 創(chuàng)建Node節(jié)點(diǎn)
- 上生產(chǎn)者鎖
- 如果隊(duì)列已經(jīng)滿了,就去生產(chǎn)者條件隊(duì)列阻塞
- 如果沒滿,或者喚醒后,就插入到last指針的后面
- 全局節(jié)點(diǎn)計數(shù)器+1
- 如果當(dāng)前隊(duì)列還有空間,就喚醒在阻塞的同胞。
- 釋放鎖
- 如果在生產(chǎn)之前隊(duì)列為空,本次生產(chǎn)后就需要喚醒在阻塞的消費(fèi)者線程,讓他們醒來消費(fèi)我剛生產(chǎn)的節(jié)點(diǎn)
take方法-消費(fèi)者
public E take() throws InterruptedException { E x; int c = -1; // 全局計數(shù)器 final AtomicInteger count = this.count; // 消費(fèi)者的鎖對象 final ReentrantLock takeLock = this.takeLock; // 可響應(yīng)中斷鎖。 takeLock.lockInterruptibly(); try { // 如果當(dāng)前隊(duì)列中沒有節(jié)點(diǎn),此時消費(fèi)者需要去阻塞,因?yàn)椴蛔枞粫速M(fèi)CPU性能,又消費(fèi)不到節(jié)點(diǎn)。 while (count.get() == 0) { // 去消費(fèi)者的條件隊(duì)列阻塞。 notEmpty.await(); } // 醒來后,去消費(fèi)節(jié)點(diǎn)。 x = dequeue(); // 給全局計數(shù)器-1,但是這里也要注意,返回的是舊值 c = count.getAndDecrement(); // 如果隊(duì)列中還有節(jié)點(diǎn)就喚醒其他消費(fèi)者去消費(fèi)節(jié)點(diǎn)。 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 這里也是一個sao點(diǎn) // 請注意,這里的c是舊值,因?yàn)間etAndDecrement返回的是舊值 // 所以,如果當(dāng)前消費(fèi)線程消費(fèi)節(jié)點(diǎn)之前隊(duì)列是滿的,當(dāng)消費(fèi)完畢后,我有必要去喚醒因?yàn)殛?duì)列滿了而阻塞等待的生產(chǎn)者,因?yàn)楫?dāng)前已經(jīng)空出一個空間了。 if (c == capacity) // 喚醒生產(chǎn)者 signalNotFull(); return x; } // 消費(fèi)者消費(fèi)節(jié)點(diǎn) // 所以需要HelpGC // 不過這里要注意,head都是指向偽節(jié)點(diǎn)。 private E dequeue() { // 拿到頭節(jié)點(diǎn), Node<E> h = head; // 拿到頭節(jié)點(diǎn)的next節(jié)點(diǎn),next節(jié)點(diǎn)作為下一個head節(jié)點(diǎn)。 // 因?yàn)閔ead節(jié)點(diǎn)是指向偽節(jié)點(diǎn),所以head.next節(jié)點(diǎn)就是當(dāng)前要消費(fèi)的節(jié)點(diǎn)。 Node<E> first = h.next; // 將當(dāng)前的頭結(jié)點(diǎn)的next指向自己。 h.next = h; // help GC // 設(shè)置新的頭結(jié)點(diǎn),也即把當(dāng)前消費(fèi)的節(jié)點(diǎn)做為下次的偽節(jié)點(diǎn) // head節(jié)點(diǎn)指向的都是偽節(jié)點(diǎn) head = first; // 拿到當(dāng)前消費(fèi)者想要的數(shù)據(jù) E x = first.item; first.item = null; return x; }
這里跟put生產(chǎn)者基本思想一致,只不過這里是消費(fèi)者,因?yàn)槭巧a(chǎn)者消費(fèi)者并行,所以這里也是喚醒同胞,因?yàn)楫?dāng)隊(duì)列為空所有的消費(fèi)者都會阻塞,所以每次消費(fèi)者線程消費(fèi)完節(jié)點(diǎn)后 ,有義務(wù)喚醒同胞。
大致流程如下:
- 拿到全局計數(shù)器
- 上消費(fèi)者鎖
- 如果當(dāng)前隊(duì)列為空,當(dāng)前消費(fèi)者線程就要去阻塞
- 如果不為空,或者被喚醒以后消費(fèi)節(jié)點(diǎn),把消費(fèi)的節(jié)點(diǎn)作為下一次的偽節(jié)點(diǎn),也即作為head節(jié)點(diǎn)
- 全局計數(shù)器-1
- 喚醒同胞
- 釋放鎖
- 如果在消費(fèi)之前隊(duì)列已經(jīng)滿了,那么可能會有生產(chǎn)者線程在阻塞,所以我有義務(wù)去喚醒他們
總結(jié)
尾插頭拿,生產(chǎn)者和消費(fèi)者并行執(zhí)行,隊(duì)列滿了生產(chǎn)者阻塞,隊(duì)列為空消費(fèi)者阻塞。消費(fèi)者有義務(wù)喚醒生產(chǎn)者,生產(chǎn)者有義務(wù)喚醒消費(fèi)者。
到此這篇關(guān)于JUC并發(fā)編程LinkedBlockingQueue隊(duì)列深入分析源碼的文章就介紹到這了,更多相關(guān)JUC LinkedBlockingQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中Shiro緩存使用Redis、Ehcache的方法
這篇文章主要介紹了SpringBoot中Shiro緩存使用Redis、Ehcache的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09Lock、Condition實(shí)現(xiàn)簡單的生產(chǎn)者消費(fèi)者模式示例
這篇文章主要介紹了Lock、Condition實(shí)現(xiàn)簡單的生產(chǎn)者消費(fèi)者模式示例,需要的朋友可以參考下2014-04-04Java程序連接數(shù)據(jù)庫的常用的類和接口介紹
這篇文章主要介紹了Java程序連接數(shù)據(jù)庫的常用的類和接口,包括Connection類和Statement類等,需要的朋友可以參考下2015-10-10Java代理模式實(shí)例詳解【靜態(tài)代理與動態(tài)代理】
這篇文章主要介紹了Java代理模式,結(jié)合實(shí)例形式詳細(xì)分析了java靜態(tài)代理與動態(tài)代理模式相關(guān)概念、原理、操作技巧與注意事項(xiàng),需要的朋友可以參考下2019-09-09Java實(shí)現(xiàn)查找當(dāng)前字符串最大回文串代碼分享
本文給大家介紹的是如何使用Java實(shí)現(xiàn)查找當(dāng)前字符串最大回文串代碼,非常的簡單實(shí)用,有需要的小伙伴可以參考下2016-07-07Java設(shè)計通用的返回數(shù)據(jù)格式過程講解
現(xiàn)在很多的項(xiàng)目server端返回client端的數(shù)據(jù)多數(shù)以JSON格式返回,同時結(jié)合其它需要,通常加一下狀態(tài)碼和信息之類,給前端處理帶來很大的方便,這篇文章就用Java設(shè)計了通用的返回數(shù)據(jù)格式,感興趣的同學(xué)可以參考下文2023-05-05