Java中的LinkedBlockingQueue源碼解析
基本認(rèn)識(shí)
LinkedBlockingQueue可以指定容量,內(nèi)部維持一個(gè)隊(duì)列,所以有一個(gè)頭節(jié)點(diǎn)head和一個(gè)尾節(jié)點(diǎn)last,內(nèi)部維持兩把鎖,一個(gè)用于入隊(duì),一個(gè)用于出隊(duì),還有鎖關(guān)聯(lián)的Condition對(duì)象。主要對(duì)象的定義如下:
//容量,如果沒(méi)有指定,該值為Integer.MAX_VALUE;
private final int capacity;
//當(dāng)前隊(duì)列中的元素
private final AtomicInteger count = new AtomicInteger();
//隊(duì)列頭節(jié)點(diǎn),始終滿足head.item==null
transient Node<E> head;
//隊(duì)列的尾節(jié)點(diǎn),始終滿足last.next==null
private transient Node<E> last;
//用于出隊(duì)的鎖
private final ReentrantLock takeLock = new ReentrantLock();
//當(dāng)隊(duì)列為空時(shí),保存執(zhí)行出隊(duì)的線程
private final Condition notEmpty = takeLock.newCondition();
//用于入隊(duì)的鎖
private final ReentrantLock putLock = new ReentrantLock();
//當(dāng)隊(duì)列滿時(shí),保存執(zhí)行入隊(duì)的線程
private final Condition notFull = putLock.newCondition();構(gòu)造方法
LinkedBlockingQueue的構(gòu)造方法有三個(gè),分別如下:
從構(gòu)造方法中可以得出3點(diǎn)結(jié)論:
1. 當(dāng)調(diào)用無(wú)參的構(gòu)造方法時(shí),容量是int的最大值
2. 隊(duì)列中至少包含一個(gè)節(jié)點(diǎn),哪怕隊(duì)列對(duì)外表現(xiàn)為空
3. LinkedBlockingQueue不支持null元素
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//last和head在隊(duì)列為空時(shí)都存在,所以隊(duì)列中至少有一個(gè)節(jié)點(diǎn)
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}put(E e)方法
首先獲得入隊(duì)的鎖putLock,判斷隊(duì)列是否已滿:count == capacity
- 未滿:將節(jié)點(diǎn)鏈入尾部,元素?cái)?shù)量+1,此時(shí)如果發(fā)現(xiàn)隊(duì)列還沒(méi)滿還可以生產(chǎn),就喚醒其他生產(chǎn)線程notFull.signal也進(jìn)行生產(chǎn),生產(chǎn)一個(gè)后,如果此時(shí)隊(duì)列是有空變?yōu)榉强盏模ㄗC明此時(shí)所有的消費(fèi)者都在阻塞notEmpty.await(),此時(shí)必須由生產(chǎn)者進(jìn)行喚醒,不然無(wú)法向下進(jìn)行,也就是從空到非空這個(gè)時(shí)候必須由生產(chǎn)者喚醒消費(fèi)者,之后的就是消費(fèi)者喚醒自己的兄弟姐妹們),就喚醒消費(fèi)者隊(duì)列notEmpty的頭結(jié)點(diǎn)notEmpty.signal,通知消費(fèi)這消費(fèi)這個(gè)消費(fèi)者消費(fèi)后發(fā)現(xiàn)還有可以消費(fèi)的元素,就通知notEmpty隊(duì)列里的頭結(jié)點(diǎn),就這樣notEmpty隊(duì)列一次被喚醒了,notEmpty只在第一次是被生產(chǎn)者喚醒的。
- 已滿:就調(diào)用notFull.await阻塞,釋放鎖,從AQS隊(duì)列移除,將生產(chǎn)者加入到notFull條件隊(duì)列尾部,等待著被喚醒后繼續(xù)生產(chǎn)
public void put(E e) throws InterruptedException {
//不允許元素為null
if (e == null) throw new NullPointerException();
int c = -1;
//以當(dāng)前元素新建一個(gè)節(jié)點(diǎn)
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//獲得入隊(duì)的鎖
putLock.lockInterruptibly();
try {
//如果隊(duì)列已滿,那么將該線程加入到Condition的等待隊(duì)列中
while (count.get() == capacity) {
notFull.await();
}
//將節(jié)點(diǎn)入隊(duì)
enqueue(node);
//得到插入之前隊(duì)列的元素個(gè)數(shù)
c = count.getAndIncrement();
//如果還可以插入元素,那么釋放等待的入隊(duì)線程
if (c + 1 < capacity)
notFull.signal();
} finally {
//解鎖
putLock.unlock();
}
//通知出隊(duì)線程隊(duì)列非空
if (c == 0)
signalNotEmpty();
} private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//獲取takeLock
takeLock.lock();
try {
//釋放notEmpty條件隊(duì)列中的第一個(gè)等待線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}E take()方法
首先獲取takeLodck,判斷隊(duì)列是否可以消費(fèi):count.get() == 0
- 可以消費(fèi):返回頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)(頭結(jié)點(diǎn)不存數(shù)據(jù),是永遠(yuǎn)存在的一個(gè)節(jié)點(diǎn)),并移除此節(jié)點(diǎn),元素?cái)?shù)量-1,如果此時(shí)發(fā)現(xiàn)隊(duì)列中還有元素可以消費(fèi),就喚醒其他消費(fèi)者notEmpty.signal,進(jìn)行消費(fèi)。如果此時(shí)隊(duì)列是從滿變?yōu)槲礉M的(證明此時(shí)所有的生產(chǎn)者都在阻塞,此時(shí)必須有消費(fèi)者喚醒生產(chǎn)者的第一個(gè)節(jié)點(diǎn),之后就是生產(chǎn)者喚醒自己的兄弟姐妹了),
- 不可以消費(fèi):調(diào)用notEmpty.awati進(jìn)行阻塞,釋放鎖,從AQS隊(duì)列移除,進(jìn)入NotEmpty隊(duì)列尾部,等待被喚醒
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//獲取takeLock鎖
takeLock.lockInterruptibly();
try {
//如果隊(duì)列為空,那么加入到notEmpty條件的等待隊(duì)列中
while (count.get() == 0) {
notEmpty.await();
}
//得到隊(duì)頭元素
x = dequeue();
//得到取走一個(gè)元素之前隊(duì)列的元素個(gè)數(shù)
c = count.getAndDecrement();
//如果隊(duì)列中還有數(shù)據(jù)可取,釋放notEmpty條件等待隊(duì)列中的第一個(gè)線程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果隊(duì)列中的元素從滿到非滿,通知put線程
if (c == capacity)
signalNotFull();
return x;
}private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}remove()方法
remove(Object)操作會(huì)從隊(duì)列的頭遍歷到尾,用到了隊(duì)列的兩端,所以需要對(duì)兩端加鎖,而對(duì)兩端加鎖就需要獲取兩把鎖;
remove()是從頭結(jié)點(diǎn)刪除,所以這個(gè)方法只需要獲取take鎖。
public boolean remove(Object o) {
//因?yàn)殛?duì)列不包含null元素,返回false
if (o == null) return false;
//獲取兩把鎖
fullyLock();
try {
//從頭的下一個(gè)節(jié)點(diǎn)開(kāi)始遍歷
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//如果匹配,那么將節(jié)點(diǎn)從隊(duì)列中移除,trail表示前驅(qū)節(jié)點(diǎn)
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//釋放兩把鎖
fullyUnlock();
}
}size()方法
由于count是一個(gè)AtomicInteger的變量,所以該方法是一個(gè)原子性的操作,是線程安全的。
public int size() {
return count.get();
}LinkedBlockingDeque
從上面的字段,可以得到LinkedBlockingDeque內(nèi)部只有一把鎖以及該鎖上關(guān)聯(lián)的兩個(gè)條件,同一時(shí)刻只有一個(gè)線程可以在隊(duì)頭或者隊(duì)尾執(zhí)行入隊(duì)或出隊(duì)操作。可以發(fā)現(xiàn)這點(diǎn)和LinkedBlockingQueue不同,LinkedBlockingQueue可以同時(shí)有兩個(gè)線程在兩端執(zhí)行操作。
LinkedBlockingQueue實(shí)現(xiàn)總結(jié)
LinkedBlockingQueue底層是一個(gè)鏈表(可以指定容量,默認(rèn)是Integer.MAX_VALUE),維持了兩把鎖,一把鎖用于入隊(duì),一把鎖用于出隊(duì),并且使用一個(gè)AtomicInterger類(lèi)型的變量保證線程安全,AtomicInterger:表示當(dāng)前隊(duì)列中含有的元素個(gè)數(shù):
- 生產(chǎn)者不斷進(jìn)行生產(chǎn)會(huì)向鏈表尾部不斷鏈入元素,直到達(dá)到容量后,此時(shí)所有生產(chǎn)者依次進(jìn)入notFull條件隊(duì)列進(jìn)行阻塞,此時(shí)如果任意一個(gè)消費(fèi)者消費(fèi)了一個(gè)元素,就會(huì)通知notFull隊(duì)列第一個(gè)節(jié)點(diǎn)進(jìn)行生產(chǎn),notFull第一個(gè)節(jié)點(diǎn)生產(chǎn)完畢后發(fā)現(xiàn)還有位置可以生產(chǎn)就會(huì)喚醒notFull的第二個(gè)節(jié)點(diǎn),notFull第二個(gè)節(jié)點(diǎn)生產(chǎn)后發(fā)現(xiàn)還有位置則喚醒notFull第三個(gè)節(jié)點(diǎn),就這樣就可以喚醒notFull里的所有生產(chǎn)者
- 消費(fèi)從鏈表頭部開(kāi)始向后消費(fèi),只要還有元素就可以不斷消費(fèi),消費(fèi)完所有的元素后,此時(shí)所有消費(fèi)者依次進(jìn)入notEmpty條件隊(duì)列進(jìn)行阻塞, 這個(gè)時(shí)候一旦生產(chǎn)者生產(chǎn)了一個(gè)元素,就會(huì)喚醒notEmpty的第一個(gè)節(jié)點(diǎn),而這個(gè)節(jié)點(diǎn)消費(fèi)完后如果發(fā)現(xiàn)還有元素可以消費(fèi),就會(huì)喚醒自己的兄弟姐妹(notEmpty的第二個(gè)節(jié)點(diǎn)),notEmpty的第二個(gè)節(jié)點(diǎn)消費(fèi)完后如果發(fā)現(xiàn)還有元素可以消費(fèi)就會(huì)再喚醒notEmpty的第三個(gè)節(jié)點(diǎn),就這樣就喚醒了notEmpty里的所有的消費(fèi)者
- 消費(fèi)者一直在砍頭,生產(chǎn)者一直在添尾
到此這篇關(guān)于Java中的LinkedBlockingQueue源碼解析的文章就介紹到這了,更多相關(guān)LinkedBlockingQueue源碼解析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java 輕松實(shí)現(xiàn)二維數(shù)組與稀疏數(shù)組互轉(zhuǎn)
在某些應(yīng)用場(chǎng)景中需要大量的二維數(shù)組來(lái)進(jìn)行數(shù)據(jù)存儲(chǔ),但是二維數(shù)組中卻有著大量的無(wú)用的位置占據(jù)著內(nèi)存空間,稀疏數(shù)組就是為了優(yōu)化二維數(shù)組,節(jié)省內(nèi)存空間2022-04-04
Java結(jié)構(gòu)型設(shè)計(jì)模式之享元模式示例詳解
享元模式(FlyWeight?Pattern),也叫蠅量模式,運(yùn)用共享技術(shù),有效的支持大量細(xì)粒度的對(duì)象,享元模式就是池技術(shù)的重要實(shí)現(xiàn)方式。本文將通過(guò)示例詳細(xì)講解享元模式,感興趣的可以了解一下2022-09-09
解決@RequestBody接收json對(duì)象報(bào)錯(cuò)415的問(wèn)題
這篇文章主要介紹了解決@RequestBody接收json對(duì)象報(bào)錯(cuò)415的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
解決IDEA錯(cuò)誤 Cause: java.sql.SQLException: The server time zone
這篇文章主要介紹了解決IDEA錯(cuò)誤 Cause: java.sql.SQLException: The server time zone value的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08
Maven構(gòu)建SpringBoot集成MyBatis過(guò)程
文章介紹了如何使用Maven創(chuàng)建一個(gè)Spring Boot項(xiàng)目,并集成MyBatis,首先,配置Maven環(huán)境,然后在pom.xml中添加MyBatis依賴,配置數(shù)據(jù)庫(kù)連接信息,最后生成Mapper XML文件和DAO層代碼2024-11-11
微服務(wù)領(lǐng)域Spring Boot自動(dòng)伸縮的實(shí)現(xiàn)方法
這篇文章主要給大家介紹了關(guān)于微服務(wù)領(lǐng)域Spring Boot自動(dòng)伸縮的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用spring boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-10-10
SpringBoot actuator 健康檢查不通過(guò)的解決方案
這篇文章主要介紹了SpringBoot actuator 健康檢查不通過(guò)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
Spring Boot中RabbitMQ自動(dòng)配置的介紹、原理和使用方法
本文介紹了Spring Boot中RabbitMQ自動(dòng)配置的介紹、原理和使用方法,通過(guò)本文的介紹,我們希望讀者能夠更好地理解Spring Boot中RabbitMQ的使用方法,并在項(xiàng)目中更加靈活地應(yīng)用,感興趣的朋友跟隨小編一起看看吧2023-07-07

