Java并發(fā)編程中的ConcurrentLinkedQueue詳解
ConcurrentLinkedQueue 詳解
ConcurrentLinkedQueue示例 下面通過(guò)一個(gè)示例來(lái)了解ConcurrentLinkedQueue的使用
import java.util.concurrent.ConcurrentLinkedQueue;
class PutThread extends Thread {
private ConcurrentLinkedQueue<Integer> clq;
public PutThread(ConcurrentLinkedQueue<Integer> clq) {
this.clq = clq;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("add " + i);
clq.add(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class GetThread extends Thread {
private ConcurrentLinkedQueue<Integer> clq;
public GetThread(ConcurrentLinkedQueue<Integer> clq) {
this.clq = clq;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("poll " + clq.poll());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();
PutThread p1 = new PutThread(clq);
GetThread g1 = new GetThread(clq);
p1.start();
g1.start();
}
}運(yùn)行結(jié)果: add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8
說(shuō)明: GetThread線程不會(huì)因?yàn)镃oncurrentLinkedQueue隊(duì)列為空而等待,而是直接返回null,所以當(dāng)實(shí)現(xiàn)隊(duì)列不空時(shí),等待時(shí),則需要用戶自己實(shí)現(xiàn)等待邏輯。
ConcurrentLinkedQueue數(shù)據(jù)結(jié)構(gòu) ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu)與LinkedBlockingQueue的數(shù)據(jù)結(jié)構(gòu)相同,都是使用的鏈表結(jié)構(gòu)。ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu)如下:

ConcurrentLinkedQueue采用的鏈表結(jié)構(gòu),并且包含有一個(gè)頭節(jié)點(diǎn)和一個(gè)尾結(jié)點(diǎn)
核心函數(shù)分析
offer函數(shù)
public boolean offer(E e) {
// 元素不為null
checkNotNull(e);
// 新生一個(gè)結(jié)點(diǎn)
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { // 無(wú)限循環(huán)
// q為p結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)
Node<E> q = p.next;
if (q == null) { // q結(jié)點(diǎn)為null
// p is last node
if (p.casNext(null, newNode)) { // 比較并進(jìn)行替換p結(jié)點(diǎn)的next域
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // p不等于t結(jié)點(diǎn),不一致 // hop two nodes at a time
// 比較并替換尾結(jié)點(diǎn)
casTail(t, newNode); // Failure is OK.
// 返回
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q) // p結(jié)點(diǎn)等于q結(jié)點(diǎn)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
// 原來(lái)的尾結(jié)點(diǎn)與現(xiàn)在的尾結(jié)點(diǎn)是否相等,若相等,則p賦值為head,否則,賦值為現(xiàn)在的尾結(jié)點(diǎn)
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 重新賦值p結(jié)點(diǎn)
p = (p != t && t != (t = tail)) ? t : q;
}
}offer函數(shù)用于將指定元素插入此隊(duì)列的尾部。下面模擬offer函數(shù)的操作,隊(duì)列狀態(tài)的變化(假設(shè)單線程添加元素,連續(xù)添加10、20兩個(gè)元素)。

若ConcurrentLinkedQueue的初始狀態(tài)如上圖所示,即隊(duì)列為空。單線程添加元素,此時(shí),添加元素10,則狀態(tài)如下所示

如上圖所示,添加元素10后,tail沒(méi)有變化,還是指向之前的結(jié)點(diǎn),繼續(xù)添加元素20,則狀態(tài)如下所示

如上圖所示,添加元素20后,tail指向了最新添加的結(jié)點(diǎn)。
poll函數(shù)
public E poll() {
restartFromHead:
for (;;) { // 無(wú)限循環(huán)
for (Node<E> h = head, p = h, q;;) { // 保存頭節(jié)點(diǎn)
// item項(xiàng)
E item = p.item;
if (item != null && p.casItem(item, null)) { // item不為null并且比較并替換item成功
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // p不等于h // hop two nodes at a time
// 更新頭節(jié)點(diǎn)
updateHead(h, ((q = p.next) != null) ? q : p);
// 返回item
return item;
}
else if ((q = p.next) == null) { // q結(jié)點(diǎn)為null
// 更新頭節(jié)點(diǎn)
updateHead(h, p);
return null;
}
else if (p == q) // p等于q
// 繼續(xù)循環(huán)
continue restartFromHead;
else
// p賦值為q
p = q;
}
}
}此函數(shù)用于獲取并移除此隊(duì)列的頭,如果此隊(duì)列為空,則返回null。下面模擬poll函數(shù)的操作,隊(duì)列狀態(tài)的變化(假設(shè)單線程操作,狀態(tài)為之前offer10、20后的狀態(tài),poll兩次)。

隊(duì)列初始狀態(tài)如上圖所示,在poll操作后,隊(duì)列的狀態(tài)如下圖所示

如上圖可知,poll操作后,head改變了,并且head所指向的結(jié)點(diǎn)的item變?yōu)榱薾ull。再進(jìn)行一次poll操作,隊(duì)列的狀態(tài)如下圖所示。

如上圖可知,poll操作后,head結(jié)點(diǎn)沒(méi)有變化,只是指示的結(jié)點(diǎn)的item域變成了null。
remove函數(shù)
public boolean remove(Object o) {
// 元素為null,返回
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) { // 獲取第一個(gè)存活的結(jié)點(diǎn)
// 第一個(gè)存活結(jié)點(diǎn)的item值
E item = p.item;
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) { // 找到item相等的結(jié)點(diǎn),并且將該結(jié)點(diǎn)的item設(shè)置為null
// p的后繼結(jié)點(diǎn)
Node<E> next = succ(p);
if (pred != null && next != null) // pred不為null并且next不為null
// 比較并替換next域
pred.casNext(p, next);
return true;
}
// pred賦值為p
pred = p;
}
return false;
}此函數(shù)用于從隊(duì)列中移除指定元素的單個(gè)實(shí)例(如果存在)。其中,會(huì)調(diào)用到first函數(shù)和succ函數(shù),first函數(shù)的源碼如下
Node<E> first() {
restartFromHead:
for (;;) { // 無(wú)限循環(huán),確保成功
for (Node<E> h = head, p = h, q;;) {
// p結(jié)點(diǎn)的item域是否為null
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) { // item不為null或者next域?yàn)閚ull
// 更新頭節(jié)點(diǎn)
updateHead(h, p);
// 返回結(jié)點(diǎn)
return hasItem ? p : null;
}
else if (p == q) // p等于q
// 繼續(xù)從頭節(jié)點(diǎn)開(kāi)始
continue restartFromHead;
else
// p賦值為q
p = q;
}
}
}first函數(shù)用于找到鏈表中第一個(gè)存活的結(jié)點(diǎn)。succ函數(shù)源碼如下
final Node<E> succ(Node<E> p) {
// p結(jié)點(diǎn)的next域
Node<E> next = p.next;
// 如果next域?yàn)樽陨?,則返回頭節(jié)點(diǎn),否則,返回next
return (p == next) ? head : next;
}succ用于獲取結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)。如果結(jié)點(diǎn)的next域指向自身,則返回head頭節(jié)點(diǎn),否則,返回next結(jié)點(diǎn)。下面模擬remove函數(shù)的操作,隊(duì)列狀態(tài)的變化(假設(shè)單線程操作,狀態(tài)為之前offer10、20后的狀態(tài),執(zhí)行remove(10)、remove(20)操作)。

如上圖所示,為ConcurrentLinkedQueue的初始狀態(tài),remove(10)后的狀態(tài)如下圖所示

如上圖所示,當(dāng)執(zhí)行remove(10)后,head指向了head結(jié)點(diǎn)之前指向的結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn),并且head結(jié)點(diǎn)的item域置為null。繼續(xù)執(zhí)行remove(20),狀態(tài)如下圖所示

如上圖所示,執(zhí)行remove(20)后,head與tail指向同一個(gè)結(jié)點(diǎn),item域?yàn)閚ull。
size函數(shù)
public int size() {
// 計(jì)數(shù)
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個(gè)存活的結(jié)點(diǎn)開(kāi)始往后遍歷
if (p.item != null) // 結(jié)點(diǎn)的item域不為null
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE) // 增加計(jì)數(shù),若達(dá)到最大值,則跳出循環(huán)
break;
// 返回大小
return count;
}此函數(shù)用于返回ConcurrenLinkedQueue的大小,從第一個(gè)存活的結(jié)點(diǎn)(first)開(kāi)始,往后遍歷鏈表,當(dāng)結(jié)點(diǎn)的item域不為null時(shí),增加計(jì)數(shù),之后返回大小。
HOPS(延遲更新的策略)的設(shè)計(jì) 通過(guò)上面對(duì)offer和poll方法的分析,我們發(fā)現(xiàn)tail和head是延遲更新的,兩者更新觸發(fā)時(shí)機(jī)為:
tail更新觸發(fā)時(shí)機(jī):當(dāng)tail指向的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)不為null的時(shí)候,會(huì)執(zhí)行定位隊(duì)列真正的隊(duì)尾節(jié)點(diǎn)的操作,找到隊(duì)尾節(jié)點(diǎn)后完成插入之后才會(huì)通過(guò)casTail進(jìn)行tail更新;當(dāng)tail指向的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為null的時(shí)候,只插入節(jié)點(diǎn)不更新tail。
head更新觸發(fā)時(shí)機(jī):當(dāng)head指向的節(jié)點(diǎn)的item域?yàn)閚ull的時(shí)候,會(huì)執(zhí)行定位隊(duì)列真正的隊(duì)頭節(jié)點(diǎn)的操作,找到隊(duì)頭節(jié)點(diǎn)后完成刪除之后才會(huì)通過(guò)updateHead進(jìn)行head更新;當(dāng)head指向的節(jié)點(diǎn)的item域不為null的時(shí)候,只刪除節(jié)點(diǎn)不更新head。
并且在更新操作時(shí),源碼中會(huì)有注釋為:hop two nodes at a time。所以這種延遲更新的策略就被叫做HOPS的大概原因是這個(gè)(猜的),從上面更新時(shí)的狀態(tài)圖可以看出,head和tail的更新是“跳著的”即中間總是間隔了一個(gè)。那么這樣設(shè)計(jì)的意圖是什么呢?
如果讓tail永遠(yuǎn)作為隊(duì)列的隊(duì)尾節(jié)點(diǎn),實(shí)現(xiàn)的代碼量會(huì)更少,而且邏輯更易懂。但是,這樣做有一個(gè)缺點(diǎn),如果大量的入隊(duì)操作,每次都要執(zhí)行CAS進(jìn)行tail的更新,匯總起來(lái)對(duì)性能也會(huì)是大大的損耗。如果能減少CAS更新的操作,無(wú)疑可以大大提升入隊(duì)的操作效率,所以doug lea大師每間隔1次(tail和隊(duì)尾節(jié)點(diǎn)的距離為1)進(jìn)行才利用CAS更新tail。對(duì)head的更新也是同樣的道理,雖然,這樣設(shè)計(jì)會(huì)多出在循環(huán)中定位隊(duì)尾節(jié)點(diǎn),但總體來(lái)說(shuō)讀的操作效率要遠(yuǎn)遠(yuǎn)高于寫的性能,因此,多出來(lái)的在循環(huán)中定位尾節(jié)點(diǎn)的操作的性能損耗相對(duì)而言是很小的。
ConcurrentLinkedQueue適合的場(chǎng)景
ConcurrentLinkedQueue通過(guò)無(wú)鎖來(lái)做到了更高并發(fā)量,是個(gè)高性能的隊(duì)列,但是使用場(chǎng)景相對(duì)不如阻塞隊(duì)列常見(jiàn),畢竟取數(shù)據(jù)也要不停的去循環(huán),不如阻塞的邏輯好設(shè)計(jì),但是在并發(fā)量特別大的情況下,是個(gè)不錯(cuò)的選擇,性能上好很多,而且這個(gè)隊(duì)列的設(shè)計(jì)也是特別費(fèi)力,尤其的使用的改良算法和對(duì)哨兵的處理。
整體的思路都是比較嚴(yán)謹(jǐn)?shù)?,這個(gè)也是使用了無(wú)鎖造成的,我們自己使用無(wú)鎖的條件的話,這個(gè)隊(duì)列是個(gè)不錯(cuò)的參考。
到此這篇關(guān)于Java并發(fā)編程中的ConcurrentLinkedQueue詳解的文章就介紹到這了,更多相關(guān)ConcurrentLinkedQueue詳解內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java與JSON數(shù)據(jù)的轉(zhuǎn)換實(shí)例詳解
這篇文章主要介紹了java與JSON數(shù)據(jù)的轉(zhuǎn)換實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2017-03-03
Java 數(shù)據(jù)結(jié)構(gòu)與算法系列精講之背包問(wèn)題
背包問(wèn)題是一個(gè)非常典型的考察動(dòng)態(tài)規(guī)劃應(yīng)用的題目,對(duì)其加上不同的限制和條件,可以衍生出諸多變種,若要全面理解動(dòng)態(tài)規(guī)劃,就必須對(duì)背包問(wèn)題了如指掌2022-02-02
JAVA項(xiàng)目如何打包部署到Linux服務(wù)器上
本文詳細(xì)介紹了在服務(wù)器上部署環(huán)境包括JDK、MySQL、Tomcat的設(shè)置,以及使用Idea-Maven-SpringBoot進(jìn)行jar包打包部署的流程,內(nèi)容涵蓋了MySQL配置注意事項(xiàng)、pom.xml配置、打包命令等關(guān)鍵步驟,同時(shí),也提供了如何將jar包上傳到Linux服務(wù)器并運(yùn)行的具體方法2024-10-10
java+selenium實(shí)現(xiàn)滑塊驗(yàn)證
現(xiàn)在越來(lái)越多的網(wǎng)站都使用采用滑塊驗(yàn)證來(lái)作為驗(yàn)證機(jī)制,用于判斷用戶是否為人類而不是機(jī)器人,本文就將利用java和selenium實(shí)現(xiàn)滑塊驗(yàn)證,希望對(duì)大家有所幫助2023-12-12
Spring事務(wù)失效場(chǎng)景原理及解決方案
這篇文章主要介紹了Spring事務(wù)失效場(chǎng)景原理及解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
手把手帶你分析SpringBoot自動(dòng)裝配完成了Ribbon哪些核心操作
這篇文章主要介紹了詳解Spring Boot自動(dòng)裝配Ribbon哪些核心操作的哪些操作,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-08-08

