Java并發(fā)編程中的ConcurrentLinkedQueue詳解
ConcurrentLinkedQueue 詳解
ConcurrentLinkedQueue示例 下面通過一個(gè)示例來了解ConcurrentLinkedQueue的使用
import java.util.concurrent.ConcurrentLinkedQueue; class PutThread extends Thread { private ConcurrentLinkedQueue<Integer> clq; public PutThread(ConcurrentLinkedQueue<Integer> clq) { this.clq = clq; } public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("add " + i); clq.add(i); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } class GetThread extends Thread { private ConcurrentLinkedQueue<Integer> clq; public GetThread(ConcurrentLinkedQueue<Integer> clq) { this.clq = clq; } public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("poll " + clq.poll()); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class ConcurrentLinkedQueueDemo { public static void main(String[] args) { ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>(); PutThread p1 = new PutThread(clq); GetThread g1 = new GetThread(clq); p1.start(); g1.start(); } }
運(yùn)行結(jié)果: add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8
說明: GetThread線程不會因?yàn)镃oncurrentLinkedQueue隊(duì)列為空而等待,而是直接返回null,所以當(dāng)實(shí)現(xiàn)隊(duì)列不空時(shí),等待時(shí),則需要用戶自己實(shí)現(xiàn)等待邏輯。
ConcurrentLinkedQueue數(shù)據(jù)結(jié)構(gòu) ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu)與LinkedBlockingQueue的數(shù)據(jù)結(jié)構(gòu)相同,都是使用的鏈表結(jié)構(gòu)。ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu)如下:
ConcurrentLinkedQueue采用的鏈表結(jié)構(gòu),并且包含有一個(gè)頭節(jié)點(diǎn)和一個(gè)尾結(jié)點(diǎn)
核心函數(shù)分析
offer函數(shù)
public boolean offer(E e) { // 元素不為null checkNotNull(e); // 新生一個(gè)結(jié)點(diǎn) final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { // 無限循環(huán) // q為p結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn) Node<E> q = p.next; if (q == null) { // q結(jié)點(diǎn)為null // p is last node if (p.casNext(null, newNode)) { // 比較并進(jìn)行替換p結(jié)點(diǎn)的next域 // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // p不等于t結(jié)點(diǎn),不一致 // hop two nodes at a time // 比較并替換尾結(jié)點(diǎn) casTail(t, newNode); // Failure is OK. // 返回 return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // p結(jié)點(diǎn)等于q結(jié)點(diǎn) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. // 原來的尾結(jié)點(diǎn)與現(xiàn)在的尾結(jié)點(diǎn)是否相等,若相等,則p賦值為head,否則,賦值為現(xiàn)在的尾結(jié)點(diǎn) p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. // 重新賦值p結(jié)點(diǎn) p = (p != t && t != (t = tail)) ? t : q; } }
offer函數(shù)用于將指定元素插入此隊(duì)列的尾部。下面模擬offer函數(shù)的操作,隊(duì)列狀態(tài)的變化(假設(shè)單線程添加元素,連續(xù)添加10、20兩個(gè)元素)。
若ConcurrentLinkedQueue的初始狀態(tài)如上圖所示,即隊(duì)列為空。單線程添加元素,此時(shí),添加元素10,則狀態(tài)如下所示
如上圖所示,添加元素10后,tail沒有變化,還是指向之前的結(jié)點(diǎn),繼續(xù)添加元素20,則狀態(tài)如下所示
如上圖所示,添加元素20后,tail指向了最新添加的結(jié)點(diǎn)。
poll函數(shù)
public E poll() { restartFromHead: for (;;) { // 無限循環(huán) for (Node<E> h = head, p = h, q;;) { // 保存頭節(jié)點(diǎn) // item項(xiàng) E item = p.item; if (item != null && p.casItem(item, null)) { // item不為null并且比較并替換item成功 // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // p不等于h // hop two nodes at a time // 更新頭節(jié)點(diǎn) updateHead(h, ((q = p.next) != null) ? q : p); // 返回item return item; } else if ((q = p.next) == null) { // q結(jié)點(diǎn)為null // 更新頭節(jié)點(diǎn) updateHead(h, p); return null; } else if (p == q) // p等于q // 繼續(xù)循環(huán) continue restartFromHead; else // p賦值為q p = q; } } }
此函數(shù)用于獲取并移除此隊(duì)列的頭,如果此隊(duì)列為空,則返回null。下面模擬poll函數(shù)的操作,隊(duì)列狀態(tài)的變化(假設(shè)單線程操作,狀態(tài)為之前offer10、20后的狀態(tài),poll兩次)。
隊(duì)列初始狀態(tài)如上圖所示,在poll操作后,隊(duì)列的狀態(tài)如下圖所示
如上圖可知,poll操作后,head改變了,并且head所指向的結(jié)點(diǎn)的item變?yōu)榱薾ull。再進(jìn)行一次poll操作,隊(duì)列的狀態(tài)如下圖所示。
如上圖可知,poll操作后,head結(jié)點(diǎn)沒有變化,只是指示的結(jié)點(diǎn)的item域變成了null。
remove函數(shù)
public boolean remove(Object o) { // 元素為null,返回 if (o == null) return false; Node<E> pred = null; for (Node<E> p = first(); p != null; p = succ(p)) { // 獲取第一個(gè)存活的結(jié)點(diǎn) // 第一個(gè)存活結(jié)點(diǎn)的item值 E item = p.item; if (item != null && o.equals(item) && p.casItem(item, null)) { // 找到item相等的結(jié)點(diǎn),并且將該結(jié)點(diǎn)的item設(shè)置為null // p的后繼結(jié)點(diǎn) Node<E> next = succ(p); if (pred != null && next != null) // pred不為null并且next不為null // 比較并替換next域 pred.casNext(p, next); return true; } // pred賦值為p pred = p; } return false; }
此函數(shù)用于從隊(duì)列中移除指定元素的單個(gè)實(shí)例(如果存在)。其中,會調(diào)用到first函數(shù)和succ函數(shù),first函數(shù)的源碼如下
Node<E> first() { restartFromHead: for (;;) { // 無限循環(huán),確保成功 for (Node<E> h = head, p = h, q;;) { // p結(jié)點(diǎn)的item域是否為null boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { // item不為null或者next域?yàn)閚ull // 更新頭節(jié)點(diǎn) updateHead(h, p); // 返回結(jié)點(diǎn) return hasItem ? p : null; } else if (p == q) // p等于q // 繼續(xù)從頭節(jié)點(diǎn)開始 continue restartFromHead; else // p賦值為q p = q; } } }
first函數(shù)用于找到鏈表中第一個(gè)存活的結(jié)點(diǎn)。succ函數(shù)源碼如下
final Node<E> succ(Node<E> p) { // p結(jié)點(diǎn)的next域 Node<E> next = p.next; // 如果next域?yàn)樽陨?,則返回頭節(jié)點(diǎn),否則,返回next return (p == next) ? head : next; }
succ用于獲取結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)。如果結(jié)點(diǎn)的next域指向自身,則返回head頭節(jié)點(diǎn),否則,返回next結(jié)點(diǎn)。下面模擬remove函數(shù)的操作,隊(duì)列狀態(tài)的變化(假設(shè)單線程操作,狀態(tài)為之前offer10、20后的狀態(tài),執(zhí)行remove(10)、remove(20)操作)。
如上圖所示,為ConcurrentLinkedQueue的初始狀態(tài),remove(10)后的狀態(tài)如下圖所示
如上圖所示,當(dāng)執(zhí)行remove(10)后,head指向了head結(jié)點(diǎn)之前指向的結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn),并且head結(jié)點(diǎn)的item域置為null。繼續(xù)執(zhí)行remove(20),狀態(tài)如下圖所示
如上圖所示,執(zhí)行remove(20)后,head與tail指向同一個(gè)結(jié)點(diǎn),item域?yàn)閚ull。
size函數(shù)
public int size() { // 計(jì)數(shù) int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個(gè)存活的結(jié)點(diǎn)開始往后遍歷 if (p.item != null) // 結(jié)點(diǎn)的item域不為null // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) // 增加計(jì)數(shù),若達(dá)到最大值,則跳出循環(huán) break; // 返回大小 return count; }
此函數(shù)用于返回ConcurrenLinkedQueue的大小,從第一個(gè)存活的結(jié)點(diǎn)(first)開始,往后遍歷鏈表,當(dāng)結(jié)點(diǎn)的item域不為null時(shí),增加計(jì)數(shù),之后返回大小。
HOPS(延遲更新的策略)的設(shè)計(jì) 通過上面對offer和poll方法的分析,我們發(fā)現(xiàn)tail和head是延遲更新的,兩者更新觸發(fā)時(shí)機(jī)為:
tail更新觸發(fā)時(shí)機(jī):當(dāng)tail指向的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)不為null的時(shí)候,會執(zhí)行定位隊(duì)列真正的隊(duì)尾節(jié)點(diǎn)的操作,找到隊(duì)尾節(jié)點(diǎn)后完成插入之后才會通過casTail進(jìn)行tail更新;當(dāng)tail指向的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為null的時(shí)候,只插入節(jié)點(diǎn)不更新tail。
head更新觸發(fā)時(shí)機(jī):當(dāng)head指向的節(jié)點(diǎn)的item域?yàn)閚ull的時(shí)候,會執(zhí)行定位隊(duì)列真正的隊(duì)頭節(jié)點(diǎn)的操作,找到隊(duì)頭節(jié)點(diǎn)后完成刪除之后才會通過updateHead進(jìn)行head更新;當(dāng)head指向的節(jié)點(diǎn)的item域不為null的時(shí)候,只刪除節(jié)點(diǎn)不更新head。
并且在更新操作時(shí),源碼中會有注釋為:hop two nodes at a time。所以這種延遲更新的策略就被叫做HOPS的大概原因是這個(gè)(猜的),從上面更新時(shí)的狀態(tài)圖可以看出,head和tail的更新是“跳著的”即中間總是間隔了一個(gè)。那么這樣設(shè)計(jì)的意圖是什么呢?
如果讓tail永遠(yuǎn)作為隊(duì)列的隊(duì)尾節(jié)點(diǎn),實(shí)現(xiàn)的代碼量會更少,而且邏輯更易懂。但是,這樣做有一個(gè)缺點(diǎn),如果大量的入隊(duì)操作,每次都要執(zhí)行CAS進(jìn)行tail的更新,匯總起來對性能也會是大大的損耗。如果能減少CAS更新的操作,無疑可以大大提升入隊(duì)的操作效率,所以doug lea大師每間隔1次(tail和隊(duì)尾節(jié)點(diǎn)的距離為1)進(jìn)行才利用CAS更新tail。對head的更新也是同樣的道理,雖然,這樣設(shè)計(jì)會多出在循環(huán)中定位隊(duì)尾節(jié)點(diǎn),但總體來說讀的操作效率要遠(yuǎn)遠(yuǎn)高于寫的性能,因此,多出來的在循環(huán)中定位尾節(jié)點(diǎn)的操作的性能損耗相對而言是很小的。
ConcurrentLinkedQueue適合的場景
ConcurrentLinkedQueue通過無鎖來做到了更高并發(fā)量,是個(gè)高性能的隊(duì)列,但是使用場景相對不如阻塞隊(duì)列常見,畢竟取數(shù)據(jù)也要不停的去循環(huán),不如阻塞的邏輯好設(shè)計(jì),但是在并發(fā)量特別大的情況下,是個(gè)不錯(cuò)的選擇,性能上好很多,而且這個(gè)隊(duì)列的設(shè)計(jì)也是特別費(fèi)力,尤其的使用的改良算法和對哨兵的處理。
整體的思路都是比較嚴(yán)謹(jǐn)?shù)?,這個(gè)也是使用了無鎖造成的,我們自己使用無鎖的條件的話,這個(gè)隊(duì)列是個(gè)不錯(cuò)的參考。
到此這篇關(guān)于Java并發(fā)編程中的ConcurrentLinkedQueue詳解的文章就介紹到這了,更多相關(guān)ConcurrentLinkedQueue詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java與JSON數(shù)據(jù)的轉(zhuǎn)換實(shí)例詳解
這篇文章主要介紹了java與JSON數(shù)據(jù)的轉(zhuǎn)換實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2017-03-03Java 數(shù)據(jù)結(jié)構(gòu)與算法系列精講之背包問題
背包問題是一個(gè)非常典型的考察動態(tài)規(guī)劃應(yīng)用的題目,對其加上不同的限制和條件,可以衍生出諸多變種,若要全面理解動態(tài)規(guī)劃,就必須對背包問題了如指掌2022-02-02JAVA項(xiàng)目如何打包部署到Linux服務(wù)器上
本文詳細(xì)介紹了在服務(wù)器上部署環(huán)境包括JDK、MySQL、Tomcat的設(shè)置,以及使用Idea-Maven-SpringBoot進(jìn)行jar包打包部署的流程,內(nèi)容涵蓋了MySQL配置注意事項(xiàng)、pom.xml配置、打包命令等關(guān)鍵步驟,同時(shí),也提供了如何將jar包上傳到Linux服務(wù)器并運(yùn)行的具體方法2024-10-10java+selenium實(shí)現(xiàn)滑塊驗(yàn)證
現(xiàn)在越來越多的網(wǎng)站都使用采用滑塊驗(yàn)證來作為驗(yàn)證機(jī)制,用于判斷用戶是否為人類而不是機(jī)器人,本文就將利用java和selenium實(shí)現(xiàn)滑塊驗(yàn)證,希望對大家有所幫助2023-12-12手把手帶你分析SpringBoot自動裝配完成了Ribbon哪些核心操作
這篇文章主要介紹了詳解Spring Boot自動裝配Ribbon哪些核心操作的哪些操作,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-08-08