Java中的BlockingQueue阻塞隊列原理以及實現(xiàn)詳解
一,BlockingQueue
在最常見的使用到這個阻塞隊列的地方,就是我們耳熟能詳?shù)木€程池里面了,作為我們線程池的一大最大參與者,也是AQS的一個具體實現(xiàn),因此可以好好的深入了解一下這個BlockingQueue阻塞隊列。
用一句話描述這個阻塞隊列就是:它是線程的一個通信工具,在任意時刻,不管并發(fā)有多高,在單jvm進程上,同一時間永遠(yuǎn)只有一個線程能夠?qū)﹃犃羞M行入隊和出隊的操作,它的特性是在任意時刻只有一個線程可以進行take或者put操作。因此這個隊列是一個線程安全的隊列。
比較適用于生產(chǎn)者和消費者的場景,因此適用的應(yīng)用場景如下 線程池,springCloud-Eureka的三級緩存,Nacos,Netty,RakectMq等
所有的阻塞隊列都都實現(xiàn)了對這個BlockingQueue接口
public interface BlockingQueue<E> extends Queue<E>
1,主要常用的隊列有如下
ArrayBlockingQueue: 由數(shù)組支持的有界隊列
LinkedBlockingQueue: 由鏈接節(jié)點支持的可選有界隊列
PriorityBlockingQueue: 由優(yōu)先級堆支持的無界優(yōu)先級隊列
DelayQueue: 由優(yōu)先級堆支持的、基于時間的調(diào)度隊列
2,基本工作原理實現(xiàn)如下
1,以一個有界隊列為例,首先消費者這邊獲取到鎖,然后會生產(chǎn)商品,然后會往隊列中填滿數(shù)據(jù),隊列填滿之后,生產(chǎn)者端會進行阻塞,同時會釋放這把鎖,并且會通知這個消費者趕緊去消費。當(dāng)然內(nèi)部也做了很多事情,不一定就是說一定要阻塞隊列滿了之后才會去喚醒生產(chǎn)者去消費,而是消費者那邊也會有一個監(jiān)聽事件,只有隊列不為空,就會有這個消費者來消費。
2,消費者在接收到生產(chǎn)者的通知之后呢,就會先去獲取到這把鎖,然后對里面的產(chǎn)品進行消費,當(dāng)隊列里面的產(chǎn)品都被消費完成之后,消費者這邊又會釋放這把鎖,然后將自身阻塞,并同時去喚醒這個生產(chǎn)者繼續(xù)生產(chǎn)產(chǎn)品。
3,生產(chǎn)者又獲取到鎖,然后重復(fù)執(zhí)行第一步。
3,基本api使用如下
二,源碼剖析
在了解過一定的工作原理之后,接下來可以對源碼分析一波。
2.1,ArrayBlockingQueue
這里主要通過這個ArrayBlockingQueue為例,來描述一下這個阻塞隊列的工作流程
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);
這個構(gòu)造方法里面有如下參數(shù)
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //非公平鎖 notEmpty = lock.newCondition(); //條件對象,用于喚醒指定線程 notFull = lock.newCondition(); //條件對象 }
生產(chǎn)者會向隊列中put產(chǎn)品,生產(chǎn)者后會持有鎖,此時會向隊列中存放產(chǎn)品,如果隊列滿了,則會阻塞自己,并且在最后會釋放鎖。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //生產(chǎn)者加鎖 lock.lockInterruptibly(); try { while (count == items.length) //如果隊列滿了,則會阻塞 notFull.await(); enqueue(e); } finally { lock.unlock(); //釋放鎖 } }
既然涉及到ReentrantLock,那么就用從之前的AQS里面講起了,這里面這要是一個CLH同步等待隊列,由一個雙向鏈表和一個同步阻塞器組成,同步阻塞器會有一個state和一個exclusiveOwnerThread狀態(tài)組成,state=0表示當(dāng)前沒有對象獲取到鎖,可以來競爭鎖。每個結(jié)點由一個前驅(qū)指針和一個后繼指針,并且里面有一個waitStatus等待狀態(tài),該狀態(tài)主要表示下一個結(jié)點的存活狀態(tài)。
這里的話不會像之前一樣使用這個CLH同步等待隊列,而是加入了一種新的Condition條件等待隊列,如下圖。由firstWaiter和nextWaiter組成的單向鏈表隊列,里面的waitStatus為CONDITION:-2 。也就是說如果當(dāng)前生產(chǎn)者結(jié)點后面的結(jié)點又是一個生產(chǎn)者節(jié)點,因為期間可能存在多個生產(chǎn)者的線程,而為了喚醒接下來的消費者,就會創(chuàng)建一個條件等待隊列,去存儲后面的生產(chǎn)者結(jié)點。 就是說在CLH同步等待隊列中,當(dāng)前結(jié)點為生產(chǎn)者的話,在阻塞隊列滿了之后,如果CLH中的下一個節(jié)點還是生產(chǎn)者,則會將waitStatus的狀態(tài)設(shè)置成-2,并將下一個節(jié)點移動到這個條件等待隊列里面并進行排隊,如果下一個結(jié)點還是,又會將下一個結(jié)點移動到這個條件等待隊列里面并進行排隊。知道下一個結(jié)點是消費者為止。
await()釋放鎖的流程如下
public final void await() throws InterruptedException { //線程是否被中斷,如果被中斷,直接拋異常 if (Thread.interrupted()) throw new InterruptedException(); //條件等待隊列,會構(gòu)建一個新的隊列 Node node = addConditionWaiter(); //釋放鎖,并對對應(yīng)的結(jié)點進行喚醒操作 int savedState = fullyRelease(node); int interruptMode = 0; //判斷當(dāng)前結(jié)點是在條件隊列里面還是在同步隊列里面 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
構(gòu)建條件等待隊列如下
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
出隊,消費者在獲取產(chǎn)品時,產(chǎn)品就會出隊,與此同時,在隊列出隊成功之后,隊列中就會有一個空位,會調(diào)用notFull.signal()方法,通知生產(chǎn)者可以去生產(chǎn)產(chǎn)品了。并將這個條件等待隊列放回這個CLH隊列里面,只有在CLH隊列里面才會獲取鎖。最后在CLH中才能進行unPark釋放鎖的操作。
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //隊列中有空位,通知生產(chǎn)者生產(chǎn)產(chǎn)品 notFull.signal(); return x; }
消費者獲取產(chǎn)品
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
三,總結(jié)
BlockingQueue也是基于這個AQS的方式實現(xiàn)的,主要是利用這個生產(chǎn)者和消費者這個模型來實現(xiàn)。
通過這個AQS中的CLH同步隊列來對節(jié)點的鎖的阻塞和釋放,期間利用了這個條件等待隊列來實現(xiàn),如果存在多個生產(chǎn)者的線程的情況下,就會將這些線程加入到一個條件等待的隊列里面。
并將這個節(jié)點的狀態(tài)改為-2,condition狀態(tài)。
在全部進入條件等待隊列之后,這個鎖還在并沒有釋放,因此最后又需要將這個條件等待隊列里面的結(jié)點加回到CLH同步隊列中,再進行排隊的釋放這個鎖。結(jié)點出隊的時候,然后生產(chǎn)者會通過一個singal監(jiān)聽這個消費者,每當(dāng)這個阻塞隊列里面出隊,有一個位置的的時候,生產(chǎn)者就會生產(chǎn)這個產(chǎn)品。
消費者也會監(jiān)聽這個隊列,隊列中只要不為空,就回去消費隊列中的產(chǎn)品。
獲取鎖的條件 只有在CLH隊列里等待的Node結(jié)點并且前驅(qū)結(jié)點的 waitStatus 為sinal = -1的可被喚醒的結(jié)點。
條件隊列里面的這些節(jié)點是不能獲取到鎖的。
到此這篇關(guān)于Java中的BlockingQueue阻塞隊列原理以及實現(xiàn)詳解的文章就介紹到這了,更多相關(guān)BlockingQueue阻塞隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot+MDC+traceId日志中打印唯一traceId
本文主要介紹了Springboot+MDC+traceId日志中打印唯一traceId,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-10-10FeignClientFactoryBean創(chuàng)建動態(tài)代理詳細(xì)解讀
這篇文章主要介紹了FeignClientFactoryBean創(chuàng)建動態(tài)代理詳細(xì)解讀,當(dāng)直接進去注冊的方法中,一步步放下走,都是直接放bean的定義信息中放入值,然后轉(zhuǎn)成BeanDefinitionHolder,最后在注冊到IOC容器中,需要的朋友可以參考下2023-11-11springmvc的validator數(shù)據(jù)校驗的實現(xiàn)示例代碼
這篇文章主要介紹了springmvc的數(shù)據(jù)校驗的實現(xiàn)示例代碼, 具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-07-07spring boot 項目中使用thymeleaf模板的案例分析
這篇文章主要介紹了spring boot 項目中使用thymeleaf模板的案例分析,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09