Java concurrency集合之ConcurrentLinkedQueue_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
ConcurrentLinkedQueue介紹
ConcurrentLinkedQueue是線程安全的隊(duì)列,它適用于“高并發(fā)”的場(chǎng)景。
它是一個(gè)基于鏈接節(jié)點(diǎn)的無(wú)界線程安全隊(duì)列,按照 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。隊(duì)列元素中不可以放置null元素(內(nèi)部實(shí)現(xiàn)的特殊節(jié)點(diǎn)除外)。
ConcurrentLinkedQueue原理和數(shù)據(jù)結(jié)構(gòu)
ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu),如下圖所示:
說(shuō)明:
1. ConcurrentLinkedQueue繼承于AbstractQueue。
2. ConcurrentLinkedQueue內(nèi)部是通過(guò)鏈表來(lái)實(shí)現(xiàn)的。它同時(shí)包含鏈表的頭節(jié)點(diǎn)head和尾節(jié)點(diǎn)tail。ConcurrentLinkedQueue按照 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。元素都是從尾部插入到鏈表,從頭部開始返回。
3. ConcurrentLinkedQueue的鏈表Node中的next的類型是volatile,而且鏈表數(shù)據(jù)item的類型也是volatile。關(guān)于volatile,我們知道它的語(yǔ)義包含:“即對(duì)一個(gè)volatile變量的讀,總是能看到(任意線程)對(duì)這個(gè)volatile變量最后的寫入”。ConcurrentLinkedQueue就是通過(guò)volatile來(lái)實(shí)現(xiàn)多線程對(duì)競(jìng)爭(zhēng)資源的互斥訪問(wèn)的。
ConcurrentLinkedQueue函數(shù)列表
// 創(chuàng)建一個(gè)最初為空的 ConcurrentLinkedQueue。 ConcurrentLinkedQueue() // 創(chuàng)建一個(gè)最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來(lái)添加元素。 ConcurrentLinkedQueue(Collection<? extends E> c) // 將指定元素插入此隊(duì)列的尾部。 boolean add(E e) // 如果此隊(duì)列包含指定元素,則返回 true。 boolean contains(Object o) // 如果此隊(duì)列不包含任何元素,則返回 true。 boolean isEmpty() // 返回在此隊(duì)列元素上以恰當(dāng)順序進(jìn)行迭代的迭代器。 Iterator<E> iterator() // 將指定元素插入此隊(duì)列的尾部。 boolean offer(E e) // 獲取但不移除此隊(duì)列的頭;如果此隊(duì)列為空,則返回 null。 E peek() // 獲取并移除此隊(duì)列的頭,如果此隊(duì)列為空,則返回 null。 E poll() // 從隊(duì)列中移除指定元素的單個(gè)實(shí)例(如果存在)。 boolean remove(Object o) // 返回此隊(duì)列中的元素?cái)?shù)量。 int size() // 返回以恰當(dāng)順序包含此隊(duì)列所有元素的數(shù)組。 Object[] toArray() // 返回以恰當(dāng)順序包含此隊(duì)列所有元素的數(shù)組;返回?cái)?shù)組的運(yùn)行時(shí)類型是指定數(shù)組的運(yùn)行時(shí)類型。 <T> T[] toArray(T[] a)
下面從ConcurrentLinkedQueue的創(chuàng)建,添加,刪除這幾個(gè)方面對(duì)它進(jìn)行分析。
1 創(chuàng)建
下面以ConcurrentLinkedQueue()來(lái)進(jìn)行說(shuō)明。
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
說(shuō)明:在構(gòu)造函數(shù)中,新建了一個(gè)“內(nèi)容為null的節(jié)點(diǎn)”,并設(shè)置表頭head和表尾tail的值為新節(jié)點(diǎn)。
head和tail的定義如下:
private transient volatile Node<E> head; private transient volatile Node<E> tail;
head和tail都是volatile類型,他們具有volatile賦予的含義:“即對(duì)一個(gè)volatile變量的讀,總是能看到(任意線程)對(duì)這個(gè)volatile變量最后的寫入”。
Node的聲明如下:
private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
說(shuō)明:
Node是個(gè)單向鏈表節(jié)點(diǎn),next用于指向下一個(gè)Node,item用于存儲(chǔ)數(shù)據(jù)。Node中操作節(jié)點(diǎn)數(shù)據(jù)的API,都是通過(guò)Unsafe機(jī)制的CAS函數(shù)實(shí)現(xiàn)的;例如casNext()是通過(guò)CAS函數(shù)“比較并設(shè)置節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)”。
2. 添加
下面以add(E e)為例對(duì)ConcurrentLinkedQueue中的添加進(jìn)行說(shuō)明。
public boolean add(E e) { return offer(e); }
說(shuō)明:add()實(shí)際上是調(diào)用的offer()來(lái)完成添加操作的。
offer()的源碼如下:
public boolean offer(E e) { // 檢查e是不是null,是的話拋出NullPointerException異常。 checkNotNull(e); // 創(chuàng)建新的節(jié)點(diǎn) final Node<E> newNode = new Node<E>(e); // 將“新的節(jié)點(diǎn)”添加到鏈表的末尾。 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 情況1:q為空 if (q == null) { // CAS操作:如果“p的下一個(gè)節(jié)點(diǎn)為null”(即p為尾節(jié)點(diǎn)),則設(shè)置p的下一個(gè)節(jié)點(diǎn)為newNode。 // 如果該CAS操作成功的話,則比較“p和t”(若p不等于t,則設(shè)置newNode為新的尾節(jié)點(diǎn)),然后返回true。 // 如果該CAS操作失敗,這意味著“其它線程對(duì)尾節(jié)點(diǎn)進(jìn)行了修改”,則重新循環(huán)。 if (p.casNext(null, newNode)) { if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } } // 情況2:p和q相等 else if (p == q) p = (t != (t = tail)) ? t : head; // 情況3:其它 else p = (p != t && t != (t = tail)) ? t : q; } }
說(shuō)明:offer(E e)的作用就是將元素e添加到鏈表的末尾。offer()比較的地方是理解for循環(huán),下面區(qū)分3種情況對(duì)for進(jìn)行分析。
情況1 -- q為空。這意味著q是尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。此時(shí),通過(guò)p.casNext(null, newNode)將“p的下一個(gè)節(jié)點(diǎn)設(shè)為newNode”,若設(shè)置成功的話,則比較“p和t”(若p不等于t,則設(shè)置newNode為新的尾節(jié)點(diǎn)),然后返回true。否則的話(意味著“其它線程對(duì)尾節(jié)點(diǎn)進(jìn)行了修改”),什么也不做,繼續(xù)進(jìn)行for循環(huán)。
p.casNext(null, newNode),是調(diào)用CAS對(duì)p進(jìn)行操作。若“p的下一個(gè)節(jié)點(diǎn)等于null”,則設(shè)置“p的下一個(gè)節(jié)點(diǎn)等于newNode”;設(shè)置成功的話,返回true,失敗的話返回false。
情況2 -- p和q相等。這種情況什么時(shí)候會(huì)發(fā)生呢?通過(guò)“情況3”,我們知道,經(jīng)過(guò)“情況3”的處理后,p的值可能等于q。
此時(shí),若尾節(jié)點(diǎn)沒有發(fā)生變化的話,那么,應(yīng)該是頭節(jié)點(diǎn)發(fā)生了變化,則設(shè)置p為頭節(jié)點(diǎn),然后重新遍歷鏈表;否則(尾節(jié)點(diǎn)變化的話),則設(shè)置p為尾節(jié)點(diǎn)。
情況3 -- 其它。
我們將p = (p != t && t != (t = tail)) ? t : q;轉(zhuǎn)換成如下代碼。
if (p==t) { p = q; } else { Node<E> tmp=t; t = tail; if (tmp==t) { p=q; } else { p=t; } }
如果p和t相等,則設(shè)置p為q。否則的話,判斷“尾節(jié)點(diǎn)是否發(fā)生變化”,沒有變化的話,則設(shè)置p為q;否則,設(shè)置p為尾節(jié)點(diǎn)。
checkNotNull()的源碼如下:
private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
3. 刪除
下面以poll()為例對(duì)ConcurrentLinkedQueue中的刪除進(jìn)行說(shuō)明。
public E poll() { // 設(shè)置“標(biāo)記” restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 情況1 // 表頭的數(shù)據(jù)不為null,并且“設(shè)置表頭的數(shù)據(jù)為null”這個(gè)操作成功的話; // 則比較“p和h”(若p!=h,即表頭發(fā)生了變化,則更新表頭,即設(shè)置表頭為p),然后返回原表頭的item值。 if (item != null && p.casItem(item, null)) { if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 情況2 // 表頭的下一個(gè)節(jié)點(diǎn)為null,即鏈表只有一個(gè)“內(nèi)容為null的表頭節(jié)點(diǎn)”。則更新表頭為p,并返回null。 else if ((q = p.next) == null) { updateHead(h, p); return null; } // 情況3 // 這可能到由于“情況4”的發(fā)生導(dǎo)致p=q,在該情況下跳轉(zhuǎn)到restartFromHead標(biāo)記重新操作。 else if (p == q) continue restartFromHead; // 情況4 // 設(shè)置p為q else p = q; } } }
說(shuō)明:poll()的作用就是刪除鏈表的表頭節(jié)點(diǎn),并返回被刪節(jié)點(diǎn)對(duì)應(yīng)的值。poll()的實(shí)現(xiàn)原理和offer()比較類似,下面根將or循環(huán)劃分為4種情況進(jìn)行分析。
情況1:“表頭節(jié)點(diǎn)的數(shù)據(jù)”不為null,并且“設(shè)置表頭節(jié)點(diǎn)的數(shù)據(jù)為null”這個(gè)操作成功。
p.casItem(item, null) -- 調(diào)用CAS函數(shù),比較“節(jié)點(diǎn)p的數(shù)據(jù)值”與item是否相等,是的話,設(shè)置節(jié)點(diǎn)p的數(shù)據(jù)值為null。
在情況1發(fā)生時(shí),先比較“p和h”,若p!=h,即表頭發(fā)生了變化,則調(diào)用updateHead()更新表頭;然后返回刪除節(jié)點(diǎn)的item值。
updateHead()的源碼如下:
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
說(shuō)明:updateHead()的最終目的是更新表頭為p,并設(shè)置h的下一個(gè)節(jié)點(diǎn)為h本身。
casHead(h,p)是通過(guò)CAS函數(shù)設(shè)置表頭,若表頭等于h的話,則設(shè)置表頭為p。
lazySetNext()的源碼如下:
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
putOrderedObject()函數(shù),我們?cè)谇懊嬉徽隆癟ODO”中介紹過(guò)。h.lazySetNext(h)的作用是通過(guò)CAS函數(shù)設(shè)置h的下一個(gè)節(jié)點(diǎn)為h自身,該設(shè)置可能會(huì)延遲執(zhí)行。
情況2:如果表頭的下一個(gè)節(jié)點(diǎn)為null,即鏈表只有一個(gè)“內(nèi)容為null的表頭節(jié)點(diǎn)”。
則調(diào)用updateHead(h, p),將表頭更新p;然后返回null。
情況3:p=q
在“情況4”的發(fā)生后,會(huì)導(dǎo)致p=q;此時(shí),“情況3”就會(huì)發(fā)生。當(dāng)“情況3”發(fā)生后,它會(huì)跳轉(zhuǎn)到restartFromHead標(biāo)記重新操作。
情況4:其它情況。
設(shè)置p=q。
ConcurrentLinkedQueue示例
import java.util.*; import java.util.concurrent.*; /* * ConcurrentLinkedQueue是“線程安全”的隊(duì)列,而LinkedList是非線程安全的。 * * 下面是“多個(gè)線程同時(shí)操作并且遍歷queue”的示例 * (01) 當(dāng)queue是ConcurrentLinkedQueue對(duì)象時(shí),程序能正常運(yùn)行。 * (02) 當(dāng)queue是LinkedList對(duì)象時(shí),程序會(huì)產(chǎn)生ConcurrentModificationException異常。 * * */ public class ConcurrentLinkedQueueDemo1 { // TODO: queue是LinkedList對(duì)象時(shí),程序會(huì)出錯(cuò)。 //private static Queue<String> queue = new LinkedList<String>(); private static Queue<String> queue = new ConcurrentLinkedQueue<String>(); public static void main(String[] args) { // 同時(shí)啟動(dòng)兩個(gè)線程對(duì)queue進(jìn)行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value; Iterator iter = queue.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 6) { // “線程名” + "-" + "序號(hào)" String val = Thread.currentThread().getName()+i; queue.add(val); // 通過(guò)“Iterator”遍歷queue。 printAll(); } } } }
(某一次)運(yùn)行結(jié)果:
ta1, ta1, tb1, tb1, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, ta3, ta1, tb3, tb1, ta4, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, tb4, ta5, tb5, ta6, tb6,
結(jié)果說(shuō)明:如果將源碼中的queue改成LinkedList對(duì)象時(shí),程序會(huì)產(chǎn)生ConcurrentModificationException異常。
相關(guān)文章
SpringBoot使用@Async注解實(shí)現(xiàn)異步調(diào)用
這篇文章主要介紹了SpringBoot使用@Async注解實(shí)現(xiàn)異步調(diào)用,異步調(diào)用是相對(duì)于同步調(diào)用而言的,同步調(diào)用是指程序按預(yù)定順序一步步執(zhí)行,每一步必須等到上一步執(zhí)行完后才能執(zhí)行,異步調(diào)用則無(wú)需等待,程序執(zhí)行完即可執(zhí)行,可以減少程序執(zhí)行時(shí)間,需要的朋友可以參考下2023-10-10Java Comparable 和 Comparator 的詳解及區(qū)別
這篇文章主要介紹了Java Comparable 和 Comparator 的詳解及區(qū)別的相關(guān)資料,Comparable 自然排序和Comparator 定制排序的實(shí)例,需要的朋友可以參考下2016-12-12mybatis實(shí)現(xiàn)mapper代理模式的方式
本文向大家講解mybatis的mapper代理模式,以根據(jù)ide值查詢單條數(shù)據(jù)為例編寫xml文件,通過(guò)mapper代理的方式進(jìn)行講解增刪改查,分步驟給大家講解的很詳細(xì),對(duì)mybatis mapper代理模式相關(guān)知識(shí)感興趣的朋友一起看看吧2021-06-06spring聲明式事務(wù)@Transactional開發(fā)常犯的幾個(gè)錯(cuò)誤及最新解決方案
使用聲明式事務(wù)@Transactional進(jìn)行事務(wù)一致性的管理,在開發(fā)過(guò)程中,發(fā)現(xiàn)很多開發(fā)同學(xué)都用錯(cuò)了spring聲明式事務(wù)@Transactional或使用不規(guī)范,導(dǎo)致出現(xiàn)各種事務(wù)問(wèn)題,這篇文章主要介紹了spring聲明式事務(wù)@Transactional開發(fā)常犯的幾個(gè)錯(cuò)誤及解決辦法,需要的朋友可以參考下2024-02-02Java簡(jiǎn)單幾步實(shí)現(xiàn)一個(gè)二叉搜索樹
二叉樹包含了根節(jié)點(diǎn),孩子節(jié)點(diǎn),葉節(jié)點(diǎn),每一個(gè)二叉樹只有一個(gè)根節(jié)點(diǎn),每一個(gè)結(jié)點(diǎn)最多只有兩個(gè)節(jié)點(diǎn),左子樹的鍵值小于根的鍵值,右子樹的鍵值大于根的鍵值,下面這篇文章主要給大家介紹了關(guān)于如何在Java中實(shí)現(xiàn)二叉搜索樹的相關(guān)資料,需要的朋友可以參考下2023-02-02Java中char數(shù)組(字符數(shù)組)與字符串String類型的轉(zhuǎn)換方法
這篇文章主要介紹了Java中char數(shù)組(字符數(shù)組)與字符串String類型的轉(zhuǎn)換方法,涉及Java中toCharArray與valueOf方法的使用技巧,需要的朋友可以參考下2015-12-12SpringBoot3整合Druid監(jiān)控功能的項(xiàng)目實(shí)踐
Druid連接池作為一款強(qiáng)大的數(shù)據(jù)庫(kù)連接池,提供了豐富的監(jiān)控和管理功能,成為很多Java項(xiàng)目的首選,本文主要介紹了SpringBoot3整合Druid監(jiān)控功能的項(xiàng)目實(shí)踐,感興趣的可以了解一下2024-01-01Java之Error與Exception的區(qū)別案例詳解
這篇文章主要介紹了Java之Error與Exception的區(qū)別案例詳解,本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-09-09