詳細分析Java并發(fā)集合LinkedBlockingQueue的用法
在上一章我們講解了ArrayBlockingQueue,用數(shù)組形式實現(xiàn)的阻塞隊列。
數(shù)組的長度在創(chuàng)建時就必須確定,如果數(shù)組長度小了,那么ArrayBlockingQueue隊列很容易就被阻塞,如果數(shù)組長度大了,就容易浪費內(nèi)存。
而隊列這個數(shù)據(jù)結(jié)構(gòu)天然適合用鏈表這個形式,而LinkedBlockingQueue就是使用鏈表方式實現(xiàn)的阻塞隊列。
一. 鏈表實現(xiàn)
1.1 Node內(nèi)部類
/**
* 鏈表的節(jié)點,同時也是通過它來實現(xiàn)一個單向鏈表
*/
static class Node<E> {
E item;
// 指向鏈表的下一個節(jié)點
Node<E> next;
Node(E x) { item = x; }
}
有一個變量e儲存數(shù)據(jù),有一個變量next指向下一個節(jié)點的引用。它可以實現(xiàn)最簡單地單向列表。
1.2 怎樣實現(xiàn)鏈表
/** * 它的next指向隊列頭,這個節(jié)點不存儲數(shù)據(jù) */ transient Node<E> head; /** * 隊列尾節(jié)點 */ private transient Node<E> last;
要實現(xiàn)鏈表,必須有兩個變量,一個表示鏈表頭head,一個表示鏈表尾last。head和last都會在LinkedBlockingQueue對象創(chuàng)建的時候被初始化。
last = head = new Node<E>(null);
注意這里用了一個小技巧,鏈表頭head節(jié)點并沒有存放數(shù)據(jù),它指向的下一個節(jié)點,才真正存儲了鏈表中第一個數(shù)據(jù)。而鏈表尾last的確儲存了鏈表最后一個數(shù)據(jù)。
1.3 插入和刪除節(jié)點
/**
* 向隊列尾插入節(jié)點
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread(); // 當前線程肯定獲取了putLock鎖
// 將原隊列尾節(jié)點的next引用指向新節(jié)點node,然后再將node節(jié)點設(shè)置成隊列尾節(jié)點last
// 這樣就實現(xiàn)了向隊列尾插入節(jié)點
last = last.next = node;
}
在鏈表尾插入節(jié)點很簡單,將原隊列尾last的下一個節(jié)點next指向新節(jié)點node,再將新節(jié)點node賦值給隊列尾last節(jié)點。這樣就實現(xiàn)了插入一個新節(jié)點。
// 移除隊列頭節(jié)點,并返回被刪除的節(jié)點數(shù)據(jù)
private E dequeue() {
// assert takeLock.isHeldByCurrentThread(); // 當前線程肯定獲取了takeLock鎖
// assert head.item == null;
Node<E> h = head;
// first節(jié)點中才存儲了隊列中第一個元素的數(shù)據(jù)
Node<E> first = h.next;
h.next = h; // help GC
// 設(shè)置新的head值,相當于刪除了first節(jié)點。因為head節(jié)點本身不儲存數(shù)據(jù)
head = first;
// 隊列頭的數(shù)據(jù)
E x = first.item;
// 移除原先的數(shù)據(jù)
first.item = null;
return x;
}
要注意head并不是鏈表頭,它的next才是指向鏈表頭,所以刪除鏈表頭也很簡單,就是將head.next賦值給head,然后返回原先head.next節(jié)點的數(shù)據(jù)。
刪除的時候,就要注意鏈表為空的情況。head.next的值使用enqueue方法添加的。當head.next==last的時候,表示已經(jīng)刪除到最后一個元素了,當head.next==null的時候,就不能刪除了,因為鏈表已經(jīng)為空了。這里沒有做判斷,是因為在調(diào)用dequeue方法的地方已經(jīng)做過判斷了。
二. 同步鎖ReentrantLock和條件Condition
因為阻塞隊列在隊列為空和隊列已滿的情況下,都必須阻塞等待,那么就天然需要兩個條件。而為了保證多線程并發(fā)安全,又需要一個同步鎖。這個在ArrayBlockingQueue中已經(jīng)說過了。
這里我們來說說LinkedBlockingQueue不一樣的地方。
/** 獨占鎖,用于處理插入隊列操作的并發(fā)問題,即put與offer操作 */ private final ReentrantLock putLock = new ReentrantLock(); /** 隊列不滿的條件Condition,它是由putLock鎖生成的 */ private final Condition notFull = putLock.newCondition(); /** 獨占鎖,用于處理刪除隊列頭操作的并發(fā)問題,即take與poll操作 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 隊列不為空的條件Condition, 它使用takeLock鎖生成的 */ private final Condition notEmpty = takeLock.newCondition();
2.1 putLock與takeLock
我們發(fā)現(xiàn)使用了兩把鎖:
- putLock 同步所有插入元素的操作,即put與offer系列方法的操作。
- takeLock 同步刪除和獲取元素的操作,即take與poll系列方法操作。
按照上面的說法,可能會出現(xiàn)同時插入元素和刪除元素的操作,那么就不會出現(xiàn)問題么?
我們來具體分析一個下,對于隊列來說操作分為三種:
- 在隊列尾插入元素。即put與offer系列方法,它們會涉及兩個變量,隊列中元素個數(shù)count,和隊列尾節(jié)點last。(不會涉及head節(jié)點)
- 移除隊列頭元素。即take與poll系列方法,它們會涉及兩個變量,隊列中元素個數(shù)count,和head節(jié)點。(不會涉及隊列尾節(jié)點last)
- 查看隊列頭元素。即 element()與peek()方法,它們會涉及兩個變量,隊列中元素個數(shù)count,和head節(jié)點。(不會涉及隊列尾節(jié)點last)
因此使用putLock鎖來保持last變量的安全,使用takeLock鎖來保持head變量的安全。
對于都涉及了隊列中元素個數(shù)count變量,所以使用AtomicInteger來保證并發(fā)安全問題。
/** 隊列中元素的個數(shù),這里使用AtomicInteger變量,保證并發(fā)安全問題 */ private final AtomicInteger count = new AtomicInteger();
2.2 notFull與notEmpty
- notFull 是由putLock鎖生成的,因為當插入元素時,我們要判斷隊列是不是已滿。
- notEmpty 是由takeLock鎖生成的,因為當刪除元素時,我們要判斷隊列是不是為空。
2.3 控制流程
當插入元素時:
- 先使用putLock.lockInterruptibly()保證只有一個線程進行插入操作.
- 然后利用count變量,判斷隊列是否已滿.
- 滿了就調(diào)用notFull.await()方法,讓當前線程等待。因為notFull是由putLock產(chǎn)生的,這里已經(jīng)獲取到鎖了,所以可以調(diào)用await方法。
- 沒滿就調(diào)用 enqueue方法,向隊列尾插入新元素。
- 調(diào)用count.getAndIncrement()方法,將隊列中元素個數(shù)加一,并保證多線程并發(fā)安全。
- 調(diào)用signalNotEmpty方法,喚醒正在等待獲取元素的線程。
當刪除元素時:
- 先使用takeLock.lockInterruptibly()保證只有一個線程進行刪除操作.
- 然后利用count變量,判斷隊列是否為空.
- 隊列為空就調(diào)用notEmpty.await()方法,讓當前線程等待。因為notEmpty是由takeLock產(chǎn)生的,這里已經(jīng)獲取到鎖了,所以可以調(diào)用await方法。
- 沒滿就調(diào)用 dequeue方法,刪除隊列頭元素。
- 調(diào)用count.getAndDecrement()方法,將隊列中元素個數(shù)減一,并保證多線程并發(fā)安全。
- 調(diào)用signalNotFull方法,喚醒正在等待插入元素的線程。
還要注意一下,Condition的signal和await方法必須在獲取鎖的情況下調(diào)用。因此就有了signalNotEmpty和signalNotFull方法:
/**
* 喚醒在notEmpty條件下等待的線程,即移除隊列頭時,發(fā)現(xiàn)隊列為空而被迫等待的線程。
* 注意,因為要調(diào)用Condition的signal方法,必須獲取對應(yīng)的鎖,所以這里調(diào)用了takeLock.lock()方法。
* 當隊列中插入元素(即put或offer操作),那么隊列肯定不為空,就會調(diào)用這個方法。
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 喚醒在notFull條件下等待的線程,即隊列尾添加元素時,發(fā)現(xiàn)隊列已滿而被迫等待的線程。
* 注意,因為要調(diào)用Condition的signal方法,必須獲取對應(yīng)的鎖,所以這里調(diào)用了putLock.lock()方法
* 當隊列中刪除元素(即take或poll操作),那么隊列肯定不滿,就會調(diào)用這個方法。
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
三. 插入元素方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 記錄插入操作前元素的個數(shù)
int c = -1;
// 創(chuàng)建新節(jié)點node
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//表示隊列已滿,那么就要調(diào)用notFull.await方法,讓當前線程等待
while (count.get() == capacity) {
notFull.await();
}
// 向隊列尾插入新元素
enqueue(node);
// 將當前隊列元素個數(shù)加1,并返回加1之前的元素個數(shù)。
c = count.getAndIncrement();
// c + 1 < capacity表示隊列未滿,就喚醒可能等待插入操作的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c == 0表示插入之前,隊列是空的。隊列從空到放入一個元素時,
// 才喚醒等待刪除的線程
// 防止頻繁獲取takeLock鎖,消耗性能
if (c == 0)
signalNotEmpty();
}
以put方法為例,大體流程與我們前面介紹一樣,這里有一個非常怪異的代碼,當插入完元素時,如果發(fā)現(xiàn)隊列未滿,那么調(diào)用notFull.signal()喚醒等待插入的線程。
大家就很疑惑了,一般來說,這個方法應(yīng)該放在刪除元素(take系列的方法里),因為當我們刪除一個元素,那么隊列肯定是未滿的,那么調(diào)用notFull.signal()方法,喚醒等待插入的線程。
這里這么做主要是因為調(diào)用signal方法,必須先獲取對應(yīng)的鎖,而在take系列的方法里使用的鎖是takeLock,那么想調(diào)用notFull.signal()方法,必須先獲取putLock鎖,這樣的話會性能就會下降,所以用了另一種方式。
- 首先我們應(yīng)該知道signal方法,當有線程在這個條件下等待時,才會喚醒其中一個線程,當沒有線程等待時,這個方法相當于什么都沒做。所以這個方法的意義是可能會喚醒等待的一個線程。
- 當隊列未滿時,我們都調(diào)用notFull.signal()嘗試去喚醒一個等待插入線程。而且這里已經(jīng)獲取putLock鎖了,所以不耗時。
- 但是有一個問題,當隊列已滿的時候,所有插入操作的線程,都會等待,就沒有機會調(diào)用notFull.signal()方法,那么喚醒這些等待線程呢?
- 喚醒這些線程的啟動條件,必須是由刪除元素操作觸發(fā)的,因為只有刪除隊列才會不滿。因為在take方法中 if (c == capacity) signalNotFull();
四. 刪除隊列頭元素
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//表示隊列為空,那么就要調(diào)用notEmpty.await方法,讓當前線程等待
while (count.get() == 0) {
notEmpty.await();
}
// 刪除隊列頭元素,并返回它
x = dequeue();
// 返回當前隊列個數(shù),然后將隊列個數(shù)減一
c = count.getAndDecrement();
// c > 1表示隊列不為空,就喚醒可能等待刪除操作的線程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
/**
* c == capacity表示刪除操作之前,隊列是滿的。只有從滿隊列中刪除一個元素時,
* 才喚醒等待插入的線程
* 防止頻繁獲取putLock鎖,消耗性能
*/
if (c == capacity)
signalNotFull();
return x;
}
為什么調(diào)用notEmpty.signal()方法原因,對比一下我們在插入元素方法中的解釋。
五. 查看隊列頭元素
// 查看隊列頭元素
public E peek() {
// 隊列為空,返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 獲取隊列頭節(jié)點first
Node<E> first = head.next;
// first == null表示隊列為空,返回null
if (first == null)
return null;
else
// 返回隊列頭元素
return first.item;
} finally {
takeLock.unlock();
}
}
查看隊列頭元素,涉及到head節(jié)點,所以必須使用takeLock鎖。
六. 其他重要方法
6.1 remove(Object o)方法
// 從隊列中刪除指定元素o
public boolean remove(Object o) {
if (o == null) return false;
// 因為不是刪除列表頭元素,所以就涉及到head和last兩個變量,
// putLock與takeLock都要加鎖
fullyLock();
try {
// 遍歷整個隊列,p表示當前節(jié)點,trail表示當前節(jié)點的前一個節(jié)點
// 因為是單向鏈表,所以需要記錄兩個節(jié)點
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 如果找到了指定元素,那么刪除節(jié)點p
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
從列表中刪除指定元素,因為刪除的元素不一定在列表頭,所以可能會head和last兩個變量,所以必須同時使用putLock與takeLock兩把鎖。因為是單向鏈表,需要一個輔助變量trail來記錄前一個節(jié)點,這樣才能刪除當前節(jié)點p。
6.2 unlink(Node<E> p, Node<E> trail) 方法
// 刪除當前節(jié)點p,trail代表p的前一個節(jié)點
void unlink(Node<E> p, Node<E> trail) {
// 將當前節(jié)點的數(shù)據(jù)設(shè)置為null
p.item = null;
// 這樣就在鏈表中刪除了節(jié)點p
trail.next = p.next;
// 如果節(jié)點p是隊列尾last,而它被刪除了,那么就要將trail設(shè)置為last
if (last == p)
last = trail;
// 將元素個數(shù)減一,如果原隊列是滿的,那么就調(diào)用notFull.signal()方法
// 其實這個不用判斷直接調(diào)用的,因為這里肯定獲取了putLock鎖
if (count.getAndDecrement() == capacity)
notFull.signal();
}
要在鏈表中刪除一個節(jié)點p,只需要將p的前一個節(jié)點trail的next指向節(jié)點p的下一個節(jié)點next。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
聊聊Java 成員變量賦值和構(gòu)造方法誰先執(zhí)行的問題
這篇文章主要介紹了聊聊Java 成員變量賦值和構(gòu)造方法誰先執(zhí)行的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10
java網(wǎng)絡(luò)通信技術(shù)之簡單聊天小程序
這篇文章主要為大家詳細介紹了java網(wǎng)絡(luò)通信技術(shù)之簡單聊天小程序,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-07-07

