Java中的LinkedBlockingQueue源碼解析
基本認識
LinkedBlockingQueue可以指定容量,內(nèi)部維持一個隊列,所以有一個頭節(jié)點head和一個尾節(jié)點last,內(nèi)部維持兩把鎖,一個用于入隊,一個用于出隊,還有鎖關(guān)聯(lián)的Condition對象。主要對象的定義如下:
//容量,如果沒有指定,該值為Integer.MAX_VALUE; private final int capacity; //當前隊列中的元素 private final AtomicInteger count = new AtomicInteger(); //隊列頭節(jié)點,始終滿足head.item==null transient Node<E> head; //隊列的尾節(jié)點,始終滿足last.next==null private transient Node<E> last; //用于出隊的鎖 private final ReentrantLock takeLock = new ReentrantLock(); //當隊列為空時,保存執(zhí)行出隊的線程 private final Condition notEmpty = takeLock.newCondition(); //用于入隊的鎖 private final ReentrantLock putLock = new ReentrantLock(); //當隊列滿時,保存執(zhí)行入隊的線程 private final Condition notFull = putLock.newCondition();
構(gòu)造方法
LinkedBlockingQueue的構(gòu)造方法有三個,分別如下:
從構(gòu)造方法中可以得出3點結(jié)論:
1. 當調(diào)用無參的構(gòu)造方法時,容量是int的最大值
2. 隊列中至少包含一個節(jié)點,哪怕隊列對外表現(xiàn)為空
3. LinkedBlockingQueue不支持null元素
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//last和head在隊列為空時都存在,所以隊列中至少有一個節(jié)點 } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put(E e)方法
首先獲得入隊的鎖putLock,判斷隊列是否已滿:count == capacity
- 未滿:將節(jié)點鏈入尾部,元素數(shù)量+1,此時如果發(fā)現(xiàn)隊列還沒滿還可以生產(chǎn),就喚醒其他生產(chǎn)線程notFull.signal也進行生產(chǎn),生產(chǎn)一個后,如果此時隊列是有空變?yōu)榉强盏模ㄗC明此時所有的消費者都在阻塞notEmpty.await(),此時必須由生產(chǎn)者進行喚醒,不然無法向下進行,也就是從空到非空這個時候必須由生產(chǎn)者喚醒消費者,之后的就是消費者喚醒自己的兄弟姐妹們),就喚醒消費者隊列notEmpty的頭結(jié)點notEmpty.signal,通知消費這消費這個消費者消費后發(fā)現(xiàn)還有可以消費的元素,就通知notEmpty隊列里的頭結(jié)點,就這樣notEmpty隊列一次被喚醒了,notEmpty只在第一次是被生產(chǎn)者喚醒的。
- 已滿:就調(diào)用notFull.await阻塞,釋放鎖,從AQS隊列移除,將生產(chǎn)者加入到notFull條件隊列尾部,等待著被喚醒后繼續(xù)生產(chǎn)
public void put(E e) throws InterruptedException { //不允許元素為null if (e == null) throw new NullPointerException(); int c = -1; //以當前元素新建一個節(jié)點 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //獲得入隊的鎖 putLock.lockInterruptibly(); try { //如果隊列已滿,那么將該線程加入到Condition的等待隊列中 while (count.get() == capacity) { notFull.await(); } //將節(jié)點入隊 enqueue(node); //得到插入之前隊列的元素個數(shù) c = count.getAndIncrement(); //如果還可以插入元素,那么釋放等待的入隊線程 if (c + 1 < capacity) notFull.signal(); } finally { //解鎖 putLock.unlock(); } //通知出隊線程隊列非空 if (c == 0) signalNotEmpty(); }
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; //獲取takeLock takeLock.lock(); try { //釋放notEmpty條件隊列中的第一個等待線程 notEmpty.signal(); } finally { takeLock.unlock(); } }
E take()方法
首先獲取takeLodck,判斷隊列是否可以消費:count.get() == 0
- 可以消費:返回頭節(jié)點的下一個節(jié)點(頭結(jié)點不存數(shù)據(jù),是永遠存在的一個節(jié)點),并移除此節(jié)點,元素數(shù)量-1,如果此時發(fā)現(xiàn)隊列中還有元素可以消費,就喚醒其他消費者notEmpty.signal,進行消費。如果此時隊列是從滿變?yōu)槲礉M的(證明此時所有的生產(chǎn)者都在阻塞,此時必須有消費者喚醒生產(chǎn)者的第一個節(jié)點,之后就是生產(chǎn)者喚醒自己的兄弟姐妹了),
- 不可以消費:調(diào)用notEmpty.awati進行阻塞,釋放鎖,從AQS隊列移除,進入NotEmpty隊列尾部,等待被喚醒
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //獲取takeLock鎖 takeLock.lockInterruptibly(); try { //如果隊列為空,那么加入到notEmpty條件的等待隊列中 while (count.get() == 0) { notEmpty.await(); } //得到隊頭元素 x = dequeue(); //得到取走一個元素之前隊列的元素個數(shù) c = count.getAndDecrement(); //如果隊列中還有數(shù)據(jù)可取,釋放notEmpty條件等待隊列中的第一個線程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //如果隊列中的元素從滿到非滿,通知put線程 if (c == capacity) signalNotFull(); return x; }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
remove()方法
remove(Object)操作會從隊列的頭遍歷到尾,用到了隊列的兩端,所以需要對兩端加鎖,而對兩端加鎖就需要獲取兩把鎖;
remove()是從頭結(jié)點刪除,所以這個方法只需要獲取take鎖。
public boolean remove(Object o) { //因為隊列不包含null元素,返回false if (o == null) return false; //獲取兩把鎖 fullyLock(); try { //從頭的下一個節(jié)點開始遍歷 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //如果匹配,那么將節(jié)點從隊列中移除,trail表示前驅(qū)節(jié)點 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { //釋放兩把鎖 fullyUnlock(); } }
size()方法
由于count是一個AtomicInteger的變量,所以該方法是一個原子性的操作,是線程安全的。
public int size() { return count.get(); }
LinkedBlockingDeque
從上面的字段,可以得到LinkedBlockingDeque內(nèi)部只有一把鎖以及該鎖上關(guān)聯(lián)的兩個條件,同一時刻只有一個線程可以在隊頭或者隊尾執(zhí)行入隊或出隊操作??梢园l(fā)現(xiàn)這點和LinkedBlockingQueue不同,LinkedBlockingQueue可以同時有兩個線程在兩端執(zhí)行操作。
LinkedBlockingQueue實現(xiàn)總結(jié)
LinkedBlockingQueue底層是一個鏈表(可以指定容量,默認是Integer.MAX_VALUE),維持了兩把鎖,一把鎖用于入隊,一把鎖用于出隊,并且使用一個AtomicInterger類型的變量保證線程安全,AtomicInterger:表示當前隊列中含有的元素個數(shù):
- 生產(chǎn)者不斷進行生產(chǎn)會向鏈表尾部不斷鏈入元素,直到達到容量后,此時所有生產(chǎn)者依次進入notFull條件隊列進行阻塞,此時如果任意一個消費者消費了一個元素,就會通知notFull隊列第一個節(jié)點進行生產(chǎn),notFull第一個節(jié)點生產(chǎn)完畢后發(fā)現(xiàn)還有位置可以生產(chǎn)就會喚醒notFull的第二個節(jié)點,notFull第二個節(jié)點生產(chǎn)后發(fā)現(xiàn)還有位置則喚醒notFull第三個節(jié)點,就這樣就可以喚醒notFull里的所有生產(chǎn)者
- 消費從鏈表頭部開始向后消費,只要還有元素就可以不斷消費,消費完所有的元素后,此時所有消費者依次進入notEmpty條件隊列進行阻塞, 這個時候一旦生產(chǎn)者生產(chǎn)了一個元素,就會喚醒notEmpty的第一個節(jié)點,而這個節(jié)點消費完后如果發(fā)現(xiàn)還有元素可以消費,就會喚醒自己的兄弟姐妹(notEmpty的第二個節(jié)點),notEmpty的第二個節(jié)點消費完后如果發(fā)現(xiàn)還有元素可以消費就會再喚醒notEmpty的第三個節(jié)點,就這樣就喚醒了notEmpty里的所有的消費者
- 消費者一直在砍頭,生產(chǎn)者一直在添尾
到此這篇關(guān)于Java中的LinkedBlockingQueue源碼解析的文章就介紹到這了,更多相關(guān)LinkedBlockingQueue源碼解析內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java 輕松實現(xiàn)二維數(shù)組與稀疏數(shù)組互轉(zhuǎn)
在某些應用場景中需要大量的二維數(shù)組來進行數(shù)據(jù)存儲,但是二維數(shù)組中卻有著大量的無用的位置占據(jù)著內(nèi)存空間,稀疏數(shù)組就是為了優(yōu)化二維數(shù)組,節(jié)省內(nèi)存空間2022-04-04Java結(jié)構(gòu)型設(shè)計模式之享元模式示例詳解
享元模式(FlyWeight?Pattern),也叫蠅量模式,運用共享技術(shù),有效的支持大量細粒度的對象,享元模式就是池技術(shù)的重要實現(xiàn)方式。本文將通過示例詳細講解享元模式,感興趣的可以了解一下2022-09-09解決@RequestBody接收json對象報錯415的問題
這篇文章主要介紹了解決@RequestBody接收json對象報錯415的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06解決IDEA錯誤 Cause: java.sql.SQLException: The server time zone
這篇文章主要介紹了解決IDEA錯誤 Cause: java.sql.SQLException: The server time zone value的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08Maven構(gòu)建SpringBoot集成MyBatis過程
文章介紹了如何使用Maven創(chuàng)建一個Spring Boot項目,并集成MyBatis,首先,配置Maven環(huán)境,然后在pom.xml中添加MyBatis依賴,配置數(shù)據(jù)庫連接信息,最后生成Mapper XML文件和DAO層代碼2024-11-11微服務領(lǐng)域Spring Boot自動伸縮的實現(xiàn)方法
這篇文章主要給大家介紹了關(guān)于微服務領(lǐng)域Spring Boot自動伸縮的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家學習或者使用spring boot具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2018-10-10SpringBoot actuator 健康檢查不通過的解決方案
這篇文章主要介紹了SpringBoot actuator 健康檢查不通過的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07Spring Boot中RabbitMQ自動配置的介紹、原理和使用方法
本文介紹了Spring Boot中RabbitMQ自動配置的介紹、原理和使用方法,通過本文的介紹,我們希望讀者能夠更好地理解Spring Boot中RabbitMQ的使用方法,并在項目中更加靈活地應用,感興趣的朋友跟隨小編一起看看吧2023-07-07