Java的非阻塞隊列ConcurrentLinkedQueue解讀
前言
在并發(fā)編程中,有時候需要使用線程安全的隊列。如果要實現(xiàn)一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另一種是使用非阻塞算法。
使用阻塞算法的隊列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現(xiàn)。非阻塞的實現(xiàn)方式則可以使用循環(huán) CAS 的方式來實現(xiàn)。
ConcurrentLinkedQueue 是一個基于鏈接節(jié)點的無界線程安全隊列,它采用先進先出的規(guī)則對節(jié)點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部;當我們獲取一個元素時,它會返回隊列頭部的元素。它采用了“wait-free”算法(即 CAS 算法) 來實現(xiàn),該算法在 Michael&Scott 算法上進行了一些修改。
ConcurrentLinkedQueue 的結(jié)構(gòu)
ConcurrentLinkedQueue 由 head 節(jié)點和 tail 節(jié)點組成,每個節(jié)點(Node)由節(jié)點元素 (item)和指向下一個節(jié)點(next)的引用組成,節(jié)點與節(jié)點之間就是通過這個 next 關(guān)聯(lián)起來,從而組成一張鏈表結(jié)構(gòu)的隊列。默認情況下 head 節(jié)點存儲的元素為空,tail 節(jié) 點等于 head 節(jié)點。
入隊列
1) 入隊列的過程
每添加一個節(jié)點就做了一個隊列的快照圖
- 添加元素 1。隊列更新 head 節(jié)點的 next 節(jié)點為元素 1 節(jié)點。又因為 tail 節(jié)點默認 情況下等于 head 節(jié)點,所以它們的 next 節(jié)點都指向元素 1 節(jié)點。
- 添加元素 2。隊列首先設(shè)置元素 1 節(jié)點的 next 節(jié)點為元素 2 節(jié)點,然后更新 tail 節(jié)點指向元素 2 節(jié)點。
- 添加元素 3,設(shè)置 tail 節(jié)點的 next 節(jié)點為元素 3 節(jié)點。
- 添加元素 4,設(shè)置元素 3 的 next 節(jié)點為元素 4 節(jié)點,然后將 tail 節(jié)點指向元素 4 節(jié)點。

入隊主要做兩件事情:
第一是將入隊節(jié)點設(shè)置成當前隊列尾節(jié)點的下一個節(jié)點;
第二是更新 tail 節(jié)點,如果 tail 節(jié)點的 next 節(jié)點不為空,則將入隊節(jié)點設(shè)置成 tail 節(jié)點,如果 tail 節(jié)點的 next 節(jié)點為 空,則將入隊節(jié)點設(shè)置成 tail 的 next 節(jié)點,所以 tail 節(jié)點不總是尾節(jié)點
/*在此隊列的尾部插入指定元素。由于隊列是無界的,這個方法永遠不會返回false 。
返回值:
true (由Queue.offer指定)
拋出:
NullPointerException – 如果指定元素為 null*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 入隊前,創(chuàng)建一個入隊節(jié)點
Node<E> n = new Node<E>(e);
retry:
// 死循環(huán),入隊不成功反復入隊。
for (; ; ) {
// 創(chuàng)建一個指向 tail 節(jié)點的引用
Node<E> t = tail;
// p 用來表示隊列的尾節(jié)點,默認情況下等于 tail 節(jié)點。
Node<E> p = t;
for (int hops = 0; ; hops++) { // 獲得 p 節(jié)點的下一個節(jié)點。
Node<E> next = succ(p);
// next 節(jié)點不為空,說明 p 不是尾節(jié)點,需要更新 p 后在將它指向 next 節(jié)點
if (next != null) {
// 循環(huán)了兩次及其以上,并且當前節(jié)點還是不等于尾節(jié)點
if (hops > HOPS && t != tail) continue retry;
p = next;
}
// 如果 p 是尾節(jié)點,則設(shè)置 p 節(jié)點的 next 節(jié)點為入隊節(jié)點。
else if (p.casNext(null, n)) {
/*如果 tail 節(jié)點有大于等于 1 個 next 節(jié)點,則將入隊節(jié)點設(shè)置成 tail 節(jié)點,
更新失敗了也沒關(guān)系,因為失敗了表示有其他線程成功更新了 tail 節(jié)點*/
if (hops >= HOPS)
casTail(t, n); // 更新 tail 節(jié)點,允許失敗
return true;
}
// p 有 next 節(jié)點,表示 p 的 next 節(jié)點是尾節(jié)點,則重新設(shè)置 p 節(jié)點
else {
p = succ(p);
}
}
}
}整個入隊過程主要做兩件事情:第一是定位出尾節(jié)點;第二是 使用 CAS 算法將入隊節(jié)點設(shè)置成尾節(jié)點的 next 節(jié)點,如不成功則重試。
2) 定位尾節(jié)點
tail 節(jié)點并不總是尾節(jié)點,所以每次入隊都必須先通過 tail 節(jié)點來找到尾節(jié)點。尾節(jié)點可能是 tail 節(jié)點,也可能是 tail 節(jié)點的 next 節(jié)點。代碼中循環(huán)體中的第一個 if 就是判 斷 tail 是否有 next 節(jié)點,有則表示 next 節(jié)點可能是尾節(jié)點。獲取 tail 節(jié)點的 next 節(jié)點需要注意的是 p 節(jié)點等于 p 的 next 節(jié)點的情況,只有一種可能就是 p 節(jié)點和 p 的 next 節(jié)點 都等于空,表示這個隊列剛初始化,正準備添加節(jié)點,所以需要返回 head 節(jié)點。
final Node<E> succ(Node<E> p) {
Node<E> next = p.getNext();
return (p == next) head:next;
}3) 設(shè)置入隊節(jié)點為尾節(jié)點
p.casNext(null,n)方法用于將入隊節(jié)點設(shè)置為當前隊列尾節(jié)點的 next 節(jié)點,如果 p.next 是 null,表示 p 是當前隊列的尾節(jié)點,如果不為 null,表示有其他線程更新了尾節(jié)點, 則需要重新獲取當前隊列的尾節(jié)點。
4) HOPS 的設(shè)計意圖
/*讓 tail 節(jié)點永遠作為隊列的尾節(jié)點,每次都需要使用循環(huán) CAS 更新 tail 節(jié)點。如果能減少 CAS
更新 tail 節(jié)點的次數(shù),就能提高入隊的效率*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (; ; ) {
Node<E> t = tail;
if (t.casNext(null, n) && casTail(t, n)) {
return true;
}
}
}使用 hops 變量來控制并減少 tail 節(jié)點的更新頻率,并不是每次節(jié)點入隊后都將 tail 節(jié)點更新成尾節(jié)點,而是當 tail 節(jié) 點和尾節(jié)點的距離大于等于常量 HOPS 的值(默認等于 1)時才更新 tail 節(jié)點,tail 和尾節(jié)點的距離越長,使用 CAS 更新 tail 節(jié)點的次數(shù)就會越少,但是距離越長帶來的負面效果就是每次入隊時定位尾節(jié)點的時間就越長,因為循環(huán)體需要多循環(huán)一次來定位出尾節(jié) 點,但是這樣仍然能提高入隊的效率,因為從本質(zhì)上來看它通過增加對 volatile 變量的讀 操作來減少對 volatile 變量的寫操作,而對 volatile 變量的寫操作開銷要遠遠大于讀操作,所以入隊效率會有所提升。 private static final int HOPS = 1;
出隊列
出隊列的就是從隊列里返回一個節(jié)點元素,并清空該節(jié)點對元素的引用。
并不是每次出隊時都更新 head 節(jié)點,當 head 節(jié)點里有元素時,直接彈出 head 節(jié)點里的元素,而不會更新 head 節(jié)點。只有當 head 節(jié)點里沒有元素時,出隊操作才會更新 head 節(jié)點。這種做法也是通過 hops 變量來減少使用 CAS 更新 head 節(jié)點的 消耗,從而提高出隊效率。

public E poll() {
Node<E> h = head;
// p 表示頭節(jié)點,需要出隊的節(jié)點
Node<E> p = h;
for (int hops = 0; ; hops++) {
// 獲取 p 節(jié)點的元素
E item = p.getItem();
// 如果 p 節(jié)點的元素不為空,使用 CAS 設(shè)置 p 節(jié)點引用的元素為 null,
// 如果成功則返回 p 節(jié)點的元素。
if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) {
// 將 p 節(jié)點下一個節(jié)點設(shè)置成 head 節(jié)點
Node<E> q = p.getNext();
updateHead(h, (q != null)q :p);
}
return item;
}
// 如果頭節(jié)點的元素為空或頭節(jié)點發(fā)生了變化,這說明頭節(jié)點已經(jīng)被另外
// 一個線程修改了。那么獲取 p 節(jié)點的下一個節(jié)點
Node<E> next = succ(p);
// 如果 p 的下一個節(jié)點也為空,說明這個隊列已經(jīng)空了
if (next == null) { // 更新頭節(jié)點。
updateHead(h, p);
break;
}
// 如果下一個元素不為空,則將頭節(jié)點的下一個節(jié)點設(shè)置成頭節(jié)點
p = next;
}
return null;
}首先獲取頭節(jié)點的元素,然后判斷頭節(jié)點元素是否為空,如果為空,表示另外一個線程已經(jīng)進行了一次出隊操作將該節(jié)點的元素取走,如果不為空,則使用 CAS 的方式將頭節(jié)點的引用設(shè)置成 null,如果 CAS 成功,則直接返回頭節(jié)點的元素,如果不成功,表示另外一個線程已經(jīng)進行了一次出隊操作更新了 head 節(jié)點,導致元素發(fā)生了變化,需要 重新獲取頭節(jié)點。
到此這篇關(guān)于Java的非阻塞隊列ConcurrentLinkedQueue解讀的文章就介紹到這了,更多相關(guān)非阻塞隊列ConcurrentLinkedQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot+dynamicDataSource動態(tài)添加切換數(shù)據(jù)源方式
這篇文章主要介紹了springboot+dynamicDataSource動態(tài)添加切換數(shù)據(jù)源方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
springmvc實現(xiàn)導出數(shù)據(jù)信息為excle表格示例代碼
本篇文章主要介紹了springmvc實現(xiàn)導出數(shù)據(jù)信息為excle表格,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧。2017-01-01
springboot創(chuàng)建的web項目整合Quartz框架的項目實踐
本文主要介紹了springboot創(chuàng)建的web項目整合Quartz框架的項目實踐,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-06-06
詳解Spring Boot 項目啟動時執(zhí)行特定方法
這篇文章主要介紹了詳解Spring Boot 項目啟動時執(zhí)行特定方法,Springboot給我們提供了兩種“開機啟動”某些方法的方式:ApplicationRunner和CommandLineRunner。感興趣的小伙伴們可以參考一下2018-06-06
springboot+thymeleaf整合阿里云OOS對象存儲圖片的實現(xiàn)
本文主要介紹了springboot+thymeleaf整合阿里云OOS對象存儲圖片的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-05-05

