Java concurrency集合之ConcurrentLinkedQueue_動力節(jié)點Java學院整理
ConcurrentLinkedQueue介紹
ConcurrentLinkedQueue是線程安全的隊列,它適用于“高并發(fā)”的場景。
它是一個基于鏈接節(jié)點的無界線程安全隊列,按照 FIFO(先進先出)原則對元素進行排序。隊列元素中不可以放置null元素(內(nèi)部實現(xiàn)的特殊節(jié)點除外)。
ConcurrentLinkedQueue原理和數(shù)據(jù)結構
ConcurrentLinkedQueue的數(shù)據(jù)結構,如下圖所示:
說明:
1. ConcurrentLinkedQueue繼承于AbstractQueue。
2. ConcurrentLinkedQueue內(nèi)部是通過鏈表來實現(xiàn)的。它同時包含鏈表的頭節(jié)點head和尾節(jié)點tail。ConcurrentLinkedQueue按照 FIFO(先進先出)原則對元素進行排序。元素都是從尾部插入到鏈表,從頭部開始返回。
3. ConcurrentLinkedQueue的鏈表Node中的next的類型是volatile,而且鏈表數(shù)據(jù)item的類型也是volatile。關于volatile,我們知道它的語義包含:“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”。ConcurrentLinkedQueue就是通過volatile來實現(xiàn)多線程對競爭資源的互斥訪問的。
ConcurrentLinkedQueue函數(shù)列表
// 創(chuàng)建一個最初為空的 ConcurrentLinkedQueue。 ConcurrentLinkedQueue() // 創(chuàng)建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。 ConcurrentLinkedQueue(Collection<? extends E> c) // 將指定元素插入此隊列的尾部。 boolean add(E e) // 如果此隊列包含指定元素,則返回 true。 boolean contains(Object o) // 如果此隊列不包含任何元素,則返回 true。 boolean isEmpty() // 返回在此隊列元素上以恰當順序進行迭代的迭代器。 Iterator<E> iterator() // 將指定元素插入此隊列的尾部。 boolean offer(E e) // 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。 E peek() // 獲取并移除此隊列的頭,如果此隊列為空,則返回 null。 E poll() // 從隊列中移除指定元素的單個實例(如果存在)。 boolean remove(Object o) // 返回此隊列中的元素數(shù)量。 int size() // 返回以恰當順序包含此隊列所有元素的數(shù)組。 Object[] toArray() // 返回以恰當順序包含此隊列所有元素的數(shù)組;返回數(shù)組的運行時類型是指定數(shù)組的運行時類型。 <T> T[] toArray(T[] a)
下面從ConcurrentLinkedQueue的創(chuàng)建,添加,刪除這幾個方面對它進行分析。
1 創(chuàng)建
下面以ConcurrentLinkedQueue()來進行說明。
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
說明:在構造函數(shù)中,新建了一個“內(nèi)容為null的節(jié)點”,并設置表頭head和表尾tail的值為新節(jié)點。
head和tail的定義如下:
private transient volatile Node<E> head; private transient volatile Node<E> tail;
head和tail都是volatile類型,他們具有volatile賦予的含義:“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”。
Node的聲明如下:
private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
說明:
Node是個單向鏈表節(jié)點,next用于指向下一個Node,item用于存儲數(shù)據(jù)。Node中操作節(jié)點數(shù)據(jù)的API,都是通過Unsafe機制的CAS函數(shù)實現(xiàn)的;例如casNext()是通過CAS函數(shù)“比較并設置節(jié)點的下一個節(jié)點”。
2. 添加
下面以add(E e)為例對ConcurrentLinkedQueue中的添加進行說明。
public boolean add(E e) { return offer(e); }
說明:add()實際上是調(diào)用的offer()來完成添加操作的。
offer()的源碼如下:
public boolean offer(E e) { // 檢查e是不是null,是的話拋出NullPointerException異常。 checkNotNull(e); // 創(chuàng)建新的節(jié)點 final Node<E> newNode = new Node<E>(e); // 將“新的節(jié)點”添加到鏈表的末尾。 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 情況1:q為空 if (q == null) { // CAS操作:如果“p的下一個節(jié)點為null”(即p為尾節(jié)點),則設置p的下一個節(jié)點為newNode。 // 如果該CAS操作成功的話,則比較“p和t”(若p不等于t,則設置newNode為新的尾節(jié)點),然后返回true。 // 如果該CAS操作失敗,這意味著“其它線程對尾節(jié)點進行了修改”,則重新循環(huán)。 if (p.casNext(null, newNode)) { if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } } // 情況2:p和q相等 else if (p == q) p = (t != (t = tail)) ? t : head; // 情況3:其它 else p = (p != t && t != (t = tail)) ? t : q; } }
說明:offer(E e)的作用就是將元素e添加到鏈表的末尾。offer()比較的地方是理解for循環(huán),下面區(qū)分3種情況對for進行分析。
情況1 -- q為空。這意味著q是尾節(jié)點的下一個節(jié)點。此時,通過p.casNext(null, newNode)將“p的下一個節(jié)點設為newNode”,若設置成功的話,則比較“p和t”(若p不等于t,則設置newNode為新的尾節(jié)點),然后返回true。否則的話(意味著“其它線程對尾節(jié)點進行了修改”),什么也不做,繼續(xù)進行for循環(huán)。
p.casNext(null, newNode),是調(diào)用CAS對p進行操作。若“p的下一個節(jié)點等于null”,則設置“p的下一個節(jié)點等于newNode”;設置成功的話,返回true,失敗的話返回false。
情況2 -- p和q相等。這種情況什么時候會發(fā)生呢?通過“情況3”,我們知道,經(jīng)過“情況3”的處理后,p的值可能等于q。
此時,若尾節(jié)點沒有發(fā)生變化的話,那么,應該是頭節(jié)點發(fā)生了變化,則設置p為頭節(jié)點,然后重新遍歷鏈表;否則(尾節(jié)點變化的話),則設置p為尾節(jié)點。
情況3 -- 其它。
我們將p = (p != t && t != (t = tail)) ? t : q;轉(zhuǎn)換成如下代碼。
if (p==t) { p = q; } else { Node<E> tmp=t; t = tail; if (tmp==t) { p=q; } else { p=t; } }
如果p和t相等,則設置p為q。否則的話,判斷“尾節(jié)點是否發(fā)生變化”,沒有變化的話,則設置p為q;否則,設置p為尾節(jié)點。
checkNotNull()的源碼如下:
private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
3. 刪除
下面以poll()為例對ConcurrentLinkedQueue中的刪除進行說明。
public E poll() { // 設置“標記” restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 情況1 // 表頭的數(shù)據(jù)不為null,并且“設置表頭的數(shù)據(jù)為null”這個操作成功的話; // 則比較“p和h”(若p!=h,即表頭發(fā)生了變化,則更新表頭,即設置表頭為p),然后返回原表頭的item值。 if (item != null && p.casItem(item, null)) { if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 情況2 // 表頭的下一個節(jié)點為null,即鏈表只有一個“內(nèi)容為null的表頭節(jié)點”。則更新表頭為p,并返回null。 else if ((q = p.next) == null) { updateHead(h, p); return null; } // 情況3 // 這可能到由于“情況4”的發(fā)生導致p=q,在該情況下跳轉(zhuǎn)到restartFromHead標記重新操作。 else if (p == q) continue restartFromHead; // 情況4 // 設置p為q else p = q; } } }
說明:poll()的作用就是刪除鏈表的表頭節(jié)點,并返回被刪節(jié)點對應的值。poll()的實現(xiàn)原理和offer()比較類似,下面根將or循環(huán)劃分為4種情況進行分析。
情況1:“表頭節(jié)點的數(shù)據(jù)”不為null,并且“設置表頭節(jié)點的數(shù)據(jù)為null”這個操作成功。
p.casItem(item, null) -- 調(diào)用CAS函數(shù),比較“節(jié)點p的數(shù)據(jù)值”與item是否相等,是的話,設置節(jié)點p的數(shù)據(jù)值為null。
在情況1發(fā)生時,先比較“p和h”,若p!=h,即表頭發(fā)生了變化,則調(diào)用updateHead()更新表頭;然后返回刪除節(jié)點的item值。
updateHead()的源碼如下:
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
說明:updateHead()的最終目的是更新表頭為p,并設置h的下一個節(jié)點為h本身。
casHead(h,p)是通過CAS函數(shù)設置表頭,若表頭等于h的話,則設置表頭為p。
lazySetNext()的源碼如下:
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
putOrderedObject()函數(shù),我們在前面一章“TODO”中介紹過。h.lazySetNext(h)的作用是通過CAS函數(shù)設置h的下一個節(jié)點為h自身,該設置可能會延遲執(zhí)行。
情況2:如果表頭的下一個節(jié)點為null,即鏈表只有一個“內(nèi)容為null的表頭節(jié)點”。
則調(diào)用updateHead(h, p),將表頭更新p;然后返回null。
情況3:p=q
在“情況4”的發(fā)生后,會導致p=q;此時,“情況3”就會發(fā)生。當“情況3”發(fā)生后,它會跳轉(zhuǎn)到restartFromHead標記重新操作。
情況4:其它情況。
設置p=q。
ConcurrentLinkedQueue示例
import java.util.*; import java.util.concurrent.*; /* * ConcurrentLinkedQueue是“線程安全”的隊列,而LinkedList是非線程安全的。 * * 下面是“多個線程同時操作并且遍歷queue”的示例 * (01) 當queue是ConcurrentLinkedQueue對象時,程序能正常運行。 * (02) 當queue是LinkedList對象時,程序會產(chǎn)生ConcurrentModificationException異常。 * * */ public class ConcurrentLinkedQueueDemo1 { // TODO: queue是LinkedList對象時,程序會出錯。 //private static Queue<String> queue = new LinkedList<String>(); private static Queue<String> queue = new ConcurrentLinkedQueue<String>(); public static void main(String[] args) { // 同時啟動兩個線程對queue進行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value; Iterator iter = queue.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 6) { // “線程名” + "-" + "序號" String val = Thread.currentThread().getName()+i; queue.add(val); // 通過“Iterator”遍歷queue。 printAll(); } } } }
(某一次)運行結果:
ta1, ta1, tb1, tb1, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, ta3, ta1, tb3, tb1, ta4, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, tb4, ta5, tb5, ta6, tb6,
結果說明:如果將源碼中的queue改成LinkedList對象時,程序會產(chǎn)生ConcurrentModificationException異常。
相關文章
SpringBoot使用@Async注解實現(xiàn)異步調(diào)用
這篇文章主要介紹了SpringBoot使用@Async注解實現(xiàn)異步調(diào)用,異步調(diào)用是相對于同步調(diào)用而言的,同步調(diào)用是指程序按預定順序一步步執(zhí)行,每一步必須等到上一步執(zhí)行完后才能執(zhí)行,異步調(diào)用則無需等待,程序執(zhí)行完即可執(zhí)行,可以減少程序執(zhí)行時間,需要的朋友可以參考下2023-10-10Java Comparable 和 Comparator 的詳解及區(qū)別
這篇文章主要介紹了Java Comparable 和 Comparator 的詳解及區(qū)別的相關資料,Comparable 自然排序和Comparator 定制排序的實例,需要的朋友可以參考下2016-12-12spring聲明式事務@Transactional開發(fā)常犯的幾個錯誤及最新解決方案
使用聲明式事務@Transactional進行事務一致性的管理,在開發(fā)過程中,發(fā)現(xiàn)很多開發(fā)同學都用錯了spring聲明式事務@Transactional或使用不規(guī)范,導致出現(xiàn)各種事務問題,這篇文章主要介紹了spring聲明式事務@Transactional開發(fā)常犯的幾個錯誤及解決辦法,需要的朋友可以參考下2024-02-02Java中char數(shù)組(字符數(shù)組)與字符串String類型的轉(zhuǎn)換方法
這篇文章主要介紹了Java中char數(shù)組(字符數(shù)組)與字符串String類型的轉(zhuǎn)換方法,涉及Java中toCharArray與valueOf方法的使用技巧,需要的朋友可以參考下2015-12-12SpringBoot3整合Druid監(jiān)控功能的項目實踐
Druid連接池作為一款強大的數(shù)據(jù)庫連接池,提供了豐富的監(jiān)控和管理功能,成為很多Java項目的首選,本文主要介紹了SpringBoot3整合Druid監(jiān)控功能的項目實踐,感興趣的可以了解一下2024-01-01Java之Error與Exception的區(qū)別案例詳解
這篇文章主要介紹了Java之Error與Exception的區(qū)別案例詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-09-09