Java concurrency集合之ConcurrentLinkedQueue_動力節(jié)點Java學院整理
ConcurrentLinkedQueue介紹
ConcurrentLinkedQueue是線程安全的隊列,它適用于“高并發(fā)”的場景。
它是一個基于鏈接節(jié)點的無界線程安全隊列,按照 FIFO(先進先出)原則對元素進行排序。隊列元素中不可以放置null元素(內部實現的特殊節(jié)點除外)。
ConcurrentLinkedQueue原理和數據結構
ConcurrentLinkedQueue的數據結構,如下圖所示:

說明:
1. ConcurrentLinkedQueue繼承于AbstractQueue。
2. ConcurrentLinkedQueue內部是通過鏈表來實現的。它同時包含鏈表的頭節(jié)點head和尾節(jié)點tail。ConcurrentLinkedQueue按照 FIFO(先進先出)原則對元素進行排序。元素都是從尾部插入到鏈表,從頭部開始返回。
3. ConcurrentLinkedQueue的鏈表Node中的next的類型是volatile,而且鏈表數據item的類型也是volatile。關于volatile,我們知道它的語義包含:“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”。ConcurrentLinkedQueue就是通過volatile來實現多線程對競爭資源的互斥訪問的。
ConcurrentLinkedQueue函數列表
// 創(chuàng)建一個最初為空的 ConcurrentLinkedQueue。 ConcurrentLinkedQueue() // 創(chuàng)建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。 ConcurrentLinkedQueue(Collection<? extends E> c) // 將指定元素插入此隊列的尾部。 boolean add(E e) // 如果此隊列包含指定元素,則返回 true。 boolean contains(Object o) // 如果此隊列不包含任何元素,則返回 true。 boolean isEmpty() // 返回在此隊列元素上以恰當順序進行迭代的迭代器。 Iterator<E> iterator() // 將指定元素插入此隊列的尾部。 boolean offer(E e) // 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。 E peek() // 獲取并移除此隊列的頭,如果此隊列為空,則返回 null。 E poll() // 從隊列中移除指定元素的單個實例(如果存在)。 boolean remove(Object o) // 返回此隊列中的元素數量。 int size() // 返回以恰當順序包含此隊列所有元素的數組。 Object[] toArray() // 返回以恰當順序包含此隊列所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。 <T> T[] toArray(T[] a)
下面從ConcurrentLinkedQueue的創(chuàng)建,添加,刪除這幾個方面對它進行分析。
1 創(chuàng)建
下面以ConcurrentLinkedQueue()來進行說明。
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
說明:在構造函數中,新建了一個“內容為null的節(jié)點”,并設置表頭head和表尾tail的值為新節(jié)點。
head和tail的定義如下:
private transient volatile Node<E> head; private transient volatile Node<E> tail;
head和tail都是volatile類型,他們具有volatile賦予的含義:“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”。
Node的聲明如下:
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
說明:
Node是個單向鏈表節(jié)點,next用于指向下一個Node,item用于存儲數據。Node中操作節(jié)點數據的API,都是通過Unsafe機制的CAS函數實現的;例如casNext()是通過CAS函數“比較并設置節(jié)點的下一個節(jié)點”。
2. 添加
下面以add(E e)為例對ConcurrentLinkedQueue中的添加進行說明。
public boolean add(E e) {
return offer(e);
}
說明:add()實際上是調用的offer()來完成添加操作的。
offer()的源碼如下:
public boolean offer(E e) {
// 檢查e是不是null,是的話拋出NullPointerException異常。
checkNotNull(e);
// 創(chuàng)建新的節(jié)點
final Node<E> newNode = new Node<E>(e);
// 將“新的節(jié)點”添加到鏈表的末尾。
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 情況1:q為空
if (q == null) {
// CAS操作:如果“p的下一個節(jié)點為null”(即p為尾節(jié)點),則設置p的下一個節(jié)點為newNode。
// 如果該CAS操作成功的話,則比較“p和t”(若p不等于t,則設置newNode為新的尾節(jié)點),然后返回true。
// 如果該CAS操作失敗,這意味著“其它線程對尾節(jié)點進行了修改”,則重新循環(huán)。
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
}
// 情況2:p和q相等
else if (p == q)
p = (t != (t = tail)) ? t : head;
// 情況3:其它
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
說明:offer(E e)的作用就是將元素e添加到鏈表的末尾。offer()比較的地方是理解for循環(huán),下面區(qū)分3種情況對for進行分析。
情況1 -- q為空。這意味著q是尾節(jié)點的下一個節(jié)點。此時,通過p.casNext(null, newNode)將“p的下一個節(jié)點設為newNode”,若設置成功的話,則比較“p和t”(若p不等于t,則設置newNode為新的尾節(jié)點),然后返回true。否則的話(意味著“其它線程對尾節(jié)點進行了修改”),什么也不做,繼續(xù)進行for循環(huán)。
p.casNext(null, newNode),是調用CAS對p進行操作。若“p的下一個節(jié)點等于null”,則設置“p的下一個節(jié)點等于newNode”;設置成功的話,返回true,失敗的話返回false。
情況2 -- p和q相等。這種情況什么時候會發(fā)生呢?通過“情況3”,我們知道,經過“情況3”的處理后,p的值可能等于q。
此時,若尾節(jié)點沒有發(fā)生變化的話,那么,應該是頭節(jié)點發(fā)生了變化,則設置p為頭節(jié)點,然后重新遍歷鏈表;否則(尾節(jié)點變化的話),則設置p為尾節(jié)點。
情況3 -- 其它。
我們將p = (p != t && t != (t = tail)) ? t : q;轉換成如下代碼。
if (p==t) {
p = q;
} else {
Node<E> tmp=t;
t = tail;
if (tmp==t) {
p=q;
} else {
p=t;
}
}
如果p和t相等,則設置p為q。否則的話,判斷“尾節(jié)點是否發(fā)生變化”,沒有變化的話,則設置p為q;否則,設置p為尾節(jié)點。
checkNotNull()的源碼如下:
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
3. 刪除
下面以poll()為例對ConcurrentLinkedQueue中的刪除進行說明。
public E poll() {
// 設置“標記”
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 情況1
// 表頭的數據不為null,并且“設置表頭的數據為null”這個操作成功的話;
// 則比較“p和h”(若p!=h,即表頭發(fā)生了變化,則更新表頭,即設置表頭為p),然后返回原表頭的item值。
if (item != null && p.casItem(item, null)) {
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 情況2
// 表頭的下一個節(jié)點為null,即鏈表只有一個“內容為null的表頭節(jié)點”。則更新表頭為p,并返回null。
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 情況3
// 這可能到由于“情況4”的發(fā)生導致p=q,在該情況下跳轉到restartFromHead標記重新操作。
else if (p == q)
continue restartFromHead;
// 情況4
// 設置p為q
else
p = q;
}
}
}
說明:poll()的作用就是刪除鏈表的表頭節(jié)點,并返回被刪節(jié)點對應的值。poll()的實現原理和offer()比較類似,下面根將or循環(huán)劃分為4種情況進行分析。
情況1:“表頭節(jié)點的數據”不為null,并且“設置表頭節(jié)點的數據為null”這個操作成功。
p.casItem(item, null) -- 調用CAS函數,比較“節(jié)點p的數據值”與item是否相等,是的話,設置節(jié)點p的數據值為null。
在情況1發(fā)生時,先比較“p和h”,若p!=h,即表頭發(fā)生了變化,則調用updateHead()更新表頭;然后返回刪除節(jié)點的item值。
updateHead()的源碼如下:
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
說明:updateHead()的最終目的是更新表頭為p,并設置h的下一個節(jié)點為h本身。
casHead(h,p)是通過CAS函數設置表頭,若表頭等于h的話,則設置表頭為p。
lazySetNext()的源碼如下:
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
putOrderedObject()函數,我們在前面一章“TODO”中介紹過。h.lazySetNext(h)的作用是通過CAS函數設置h的下一個節(jié)點為h自身,該設置可能會延遲執(zhí)行。
情況2:如果表頭的下一個節(jié)點為null,即鏈表只有一個“內容為null的表頭節(jié)點”。
則調用updateHead(h, p),將表頭更新p;然后返回null。
情況3:p=q
在“情況4”的發(fā)生后,會導致p=q;此時,“情況3”就會發(fā)生。當“情況3”發(fā)生后,它會跳轉到restartFromHead標記重新操作。
情況4:其它情況。
設置p=q。
ConcurrentLinkedQueue示例
import java.util.*;
import java.util.concurrent.*;
/*
* ConcurrentLinkedQueue是“線程安全”的隊列,而LinkedList是非線程安全的。
*
* 下面是“多個線程同時操作并且遍歷queue”的示例
* (01) 當queue是ConcurrentLinkedQueue對象時,程序能正常運行。
* (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
*
*
*/
public class ConcurrentLinkedQueueDemo1 {
// TODO: queue是LinkedList對象時,程序會出錯。
//private static Queue<String> queue = new LinkedList<String>();
private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
public static void main(String[] args) {
// 同時啟動兩個線程對queue進行操作!
new MyThread("ta").start();
new MyThread("tb").start();
}
private static void printAll() {
String value;
Iterator iter = queue.iterator();
while(iter.hasNext()) {
value = (String)iter.next();
System.out.print(value+", ");
}
System.out.println();
}
private static class MyThread extends Thread {
MyThread(String name) {
super(name);
}
@Override
public void run() {
int i = 0;
while (i++ < 6) {
// “線程名” + "-" + "序號"
String val = Thread.currentThread().getName()+i;
queue.add(val);
// 通過“Iterator”遍歷queue。
printAll();
}
}
}
}
(某一次)運行結果:
ta1, ta1, tb1, tb1, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, ta3, ta1, tb3, tb1, ta4, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, tb4, ta5, tb5, ta6, tb6,
結果說明:如果將源碼中的queue改成LinkedList對象時,程序會產生ConcurrentModificationException異常。
相關文章
Java Comparable 和 Comparator 的詳解及區(qū)別
這篇文章主要介紹了Java Comparable 和 Comparator 的詳解及區(qū)別的相關資料,Comparable 自然排序和Comparator 定制排序的實例,需要的朋友可以參考下2016-12-12
spring聲明式事務@Transactional開發(fā)常犯的幾個錯誤及最新解決方案
使用聲明式事務@Transactional進行事務一致性的管理,在開發(fā)過程中,發(fā)現很多開發(fā)同學都用錯了spring聲明式事務@Transactional或使用不規(guī)范,導致出現各種事務問題,這篇文章主要介紹了spring聲明式事務@Transactional開發(fā)常犯的幾個錯誤及解決辦法,需要的朋友可以參考下2024-02-02
Java中char數組(字符數組)與字符串String類型的轉換方法
這篇文章主要介紹了Java中char數組(字符數組)與字符串String類型的轉換方法,涉及Java中toCharArray與valueOf方法的使用技巧,需要的朋友可以參考下2015-12-12
SpringBoot3整合Druid監(jiān)控功能的項目實踐
Druid連接池作為一款強大的數據庫連接池,提供了豐富的監(jiān)控和管理功能,成為很多Java項目的首選,本文主要介紹了SpringBoot3整合Druid監(jiān)控功能的項目實踐,感興趣的可以了解一下2024-01-01
Java之Error與Exception的區(qū)別案例詳解
這篇文章主要介紹了Java之Error與Exception的區(qū)別案例詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內容,需要的朋友可以參考下2021-09-09

