Java并發(fā)編程之ConcurrentLinkedQueue隊列詳情
ConcurrentLinkedQueue
JDK中提供了一系列場景的并發(fā)安全隊列??偟膩碚f,按照實現(xiàn)方式的不同可分為阻塞隊列和非阻塞隊列,前者使用鎖實現(xiàn),而后則使用CAS非阻塞算法實現(xiàn)。
ConcurrentLinkedQueue
內(nèi)部的隊列使用單向鏈表方式實現(xiàn),其中有兩個volatile
類型的 Node 節(jié)點(diǎn)分別用來存放隊列的首、尾節(jié)點(diǎn)。從下面的無參構(gòu)造函數(shù)可知,默認(rèn)頭、尾節(jié)點(diǎn)都是指向 item 為null 的哨兵節(jié)點(diǎn)。新元素會被插入隊列末尾,出隊時從隊列頭部獲取一個元素。
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
在 Node 節(jié)點(diǎn)內(nèi)部則維護(hù)一個使用volatile 修飾的變量 item,用來存放節(jié)點(diǎn)的值;next用來存放鏈表的下一個節(jié)點(diǎn),從而鏈接為一個單向無界鏈表。其內(nèi)部則使用 UNSafe 工具類提供的CAS 算法來保證出入隊時操作鏈表的原子性。
下面通過介紹ConcurrentLinkedQueue
的幾個方法來介紹其實現(xiàn)原理。
offer操作: offer操作是在隊列末尾添加一個元素,如果傳遞的參數(shù)是null則拋出NPE異常,否則由于ConcurrentLinkedQueue
是無界隊列,該方法一直會返回true。另外,由于使用CAS無阻塞算法,因此該方法不會阻塞掛起調(diào)用線程。下面具體看下實現(xiàn)原理。
public boolean offer(E e) { //(1)e為null這拋出空指針異常 checkNotNull(e); //(2)構(gòu)造Node節(jié)點(diǎn),在構(gòu)造函數(shù)內(nèi)部調(diào)用unsafe.putObject final Node<E> newNode = new Node<E>(e); //(3) 從尾節(jié)點(diǎn)插入 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //(4) 如果q==null說明p是尾節(jié)點(diǎn),則執(zhí)行插入 if (q == null) { // p is last node //(5)使用CAS設(shè)置p節(jié)點(diǎn)的next節(jié)點(diǎn) if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". //(6)CAS成功,則說明新增節(jié)點(diǎn)已經(jīng)放入鏈表,然后設(shè)置當(dāng)前尾巴節(jié)點(diǎn) if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
- 首先看當(dāng)一個線程調(diào)用
offer
(item)時的情況。首先代碼(1)對傳參進(jìn)行空檢查, 由于使用 如果為null 則拋出NPE 異常,否則執(zhí)行代碼(2)并使用item作為構(gòu)造函數(shù)參數(shù)創(chuàng)建一 個新的節(jié)點(diǎn),然后代碼(3)從隊列尾部節(jié)點(diǎn)開始循環(huán),打算從隊列尾部添加元素。這時候節(jié)點(diǎn)p、t、head、tail同時指向了item為null的哨兵節(jié)點(diǎn),由于哨兵節(jié)點(diǎn)的next 節(jié)點(diǎn)為null,所以這里q也指向null。代碼(4)發(fā)現(xiàn)q->null則執(zhí)行代碼(5),通過CAS 原子操作判斷p節(jié)點(diǎn)的next節(jié)點(diǎn)是否為null,如果為null 則使用節(jié)點(diǎn)newNode替換p的next節(jié)點(diǎn),然后執(zhí)行代碼(6),這里由于p=t所以沒有設(shè)置尾部節(jié)點(diǎn),然后退出 offer方法。 - 上面是一個線程調(diào)用offer方法的情況,如果多個線程同時調(diào)用,就會存在多個線程同時執(zhí)行到代碼(5)的情況。假設(shè)線程A調(diào)用offer(item1),線程B調(diào)用 ofer(item2),同時執(zhí)行到代碼(5)p.casNext(null, newNode)。由于CAS的比較設(shè)置操作是原子性的,所以這里假設(shè)線程A先執(zhí)行了比較設(shè)置操作,發(fā)現(xiàn)當(dāng)前p的 next 節(jié)點(diǎn)確實是null,則會原子性地更新next節(jié)點(diǎn)為iteml,這時候線程B也會判斷p的next節(jié)點(diǎn)是否為null,結(jié)果發(fā)現(xiàn)不是null(因為線程A已經(jīng)設(shè)置了p的next節(jié)點(diǎn)為iteml),則會跳到代碼(3),然后執(zhí)行到代碼(4)。
可見,offer 操作中的關(guān)鍵步驟是代碼(5),通過原子CAS 操作來控制某時只有一個線程可以追加元素到隊列末尾。進(jìn)行CAS 競爭失敗的線程會通過循環(huán)一次次嘗試進(jìn)行 CAS操作,直到CAS 成功才會返回,也就是通過使用無限循環(huán)不斷進(jìn)行 CAS 嘗試方式來替代阻塞算法掛起調(diào)用線程。相比阻塞算法,這是使用CPU資源換取阻塞所帶來的開銷。
add操作:
add操作是在鏈表尾部添加一個元素,其實在內(nèi)部調(diào)用的還是offer操作。
public boolean add(E e) { return offer(e); }
poll操作:
poll操作是在隊列頭部獲取并移除一個元素,如果隊列為空則返回null。
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
poll方法在移除一個元素時,只是簡單地使用 CAS操作把當(dāng)前節(jié)點(diǎn)的item值設(shè)置為null,然后通過重新設(shè)置頭節(jié)點(diǎn)將該元素從隊列里面移除,被移除的節(jié)點(diǎn)就成了孤立節(jié)點(diǎn),這個節(jié)點(diǎn)會在垃圾回收時被回收掉。另外,如果在執(zhí)行分支中發(fā)現(xiàn)頭節(jié)點(diǎn)被修改了,要跳到外層循環(huán)重新獲取新的頭節(jié)點(diǎn)。
peak操作:
peak操作是獲取隊列頭部獲一個元素,如果隊列為空則返回null。
public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; //注釋 if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
Peek操作的代碼結(jié)構(gòu)與poll操作類似,不同之處在于我們在代碼中標(biāo)記注釋的地方中少了castItem操作。其實這很正常,因為peek只是獲取隊列頭元素值,并不清空其值。根據(jù)前面的介紹我們知道第一次執(zhí)行offer后head指向的是哨兵節(jié)點(diǎn)(也就是item為null的節(jié)點(diǎn)),那么第一次執(zhí)行peek時在注釋處會發(fā)現(xiàn)item==null,然后執(zhí)行q=p.next,這時候q節(jié)點(diǎn)指向的才是隊列里面第一個真正的元素,或者如果隊列為 null 則 q 指向 null。
到此這篇關(guān)于Java并發(fā)編程之ConcurrentLinkedQueue隊列詳情的文章就介紹到這了,更多相關(guān)Java并發(fā)編程 ConcurrentLinkedQueue 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
深入分析Android系統(tǒng)中SparseArray的源碼
這篇文章主要介紹了深入分析Android系統(tǒng)中SparseArray的源碼,SparseArray為Java實現(xiàn),需要的朋友可以參考下2015-07-07啟用springboot security后登錄web頁面需要用戶名和密碼的解決方法
這篇文章主要介紹了啟用springboot security后登錄web頁面需要用戶名和密碼的解決方法,也就是使用默認(rèn)用戶和密碼登錄的操作方法,本文結(jié)合實例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02idea輸入sout無法自動補(bǔ)全System.out.println()的問題
這篇文章主要介紹了idea輸入sout無法自動補(bǔ)全System.out.println()的問題,本文給大家分享解決方案,供大家參考,需要的朋友可以參考下2020-07-07SpringBoot根據(jù)各地區(qū)時間設(shè)置接口有效時間的實現(xiàn)方式
這篇文章給大家介紹了SpringBoot根據(jù)各地區(qū)時間設(shè)置接口有效時間的實現(xiàn)方式,文中通過代碼示例給大家講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-01-01深入了解Maven Settings.xml文件的結(jié)構(gòu)和功能
這篇文章主要為大家介紹了Maven Settings.xml文件基本結(jié)構(gòu)和功能詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11springboot基于Redis發(fā)布訂閱集群下WebSocket的解決方案
這篇文章主要介紹了springboot基于Redis發(fā)布訂閱集群下WebSocket的解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01