詳解Java如何實現(xiàn)一個BlockingQueue
前幾篇文章,主要討論了一下關(guān)于互斥的相關(guān)內(nèi)容,例如synchronized、Lock、CAS、原子類,累加器等。未來的幾篇文章將討論線程同步,例如condition、信號量、CountDownLatch、CyclicBarrier等。
線程同步與線程互斥的區(qū)別
互斥與同步是多線程需要解決的兩大核心問題。互斥通過互斥鎖來解決問題,同步則是通過同步工具來解決。
互斥是一種間接制約關(guān)系,是指系統(tǒng)中的某些共享資源,一次只允許一個線程訪問。當(dāng)一個線程正在訪問該臨界資源時,其它線程必須等待。
同步,又稱直接制約關(guān)系,是指多個線程(或進程)為了合作完成任務(wù),必須嚴(yán)格按照規(guī)定的某種先后次序來運行。
如何實現(xiàn)一個BlockingQueue
如果有線程需要在某些“條件”滿足后才接著后續(xù)操作,要如何實現(xiàn)?
例如:設(shè)計一個阻塞隊列,當(dāng)隊列元素為空的時候,不允許讀取線程讀取,當(dāng)隊列元素滿的時候,不允許寫入線程寫入。
要實現(xiàn)這樣的功能,需要一個完整的等待-通知機制:線程首先獲取互斥鎖,當(dāng)線程要求的條件不滿足時,釋放互斥鎖,進入等待狀態(tài);當(dāng)要求的條件滿足時,通知等待的線程,重新獲取互斥鎖。
在java中可以通過這么幾種方式來實現(xiàn)
synchronized + loopsynchronized + wait + notifyAllConditionSemaphore
其中1不常用,2和3都可以稱之為條件變量,4為信號量,本篇文章主要講解條件變量的實現(xiàn)方式。
Synchronized + Look實現(xiàn)BlockingQueue
public class BlockingQueueWithLoop implements Queue {
?
?private int[] elements;
?private int head;
?private int tail;
?private volatile int size; // 隊列元素個數(shù)
?
?public BlockingQueueWithLoop() {
? ?this(10);
}
?
?public BlockingQueueWithLoop(int capacity) {
? ?this.elements = new int[capacity];
? ?this.head = 0;
? ?this.tail = 0;
? ?this.size = 0;
}
?
?@Override
?public void put(int e) throws InterruptedException {
? ?while (size == elements.length) {} // 使用自旋鎖,等待隊列不滿
? ?synchronized (this) {
? ? ?if(size == elements.length){ // 雙重檢索
? ? ? ?return;
? ? }
? ? ?elements[tail] = e;
? ? ?tail++;
? ? ?if (tail == elements.length) {
? ? ? ?tail = 0;
? ? }
? ? ?size++;
? }
}
?
?@Override
?public int take() throws InterruptedException {
? ?while (true) {
? ? ?while (size <= 0) {} // 使用自旋鎖,等待隊列不為空
?
? ? ?synchronized (this) { // 隊列不為空,需要加鎖
? ? ? ?if (size > 0) { ? ? // 雙重檢索
? ? ? ? ?int e = elements[head];
? ? ? ? ?head++;
? ? ? ? ?if (head == elements.length) {
? ? ? ? ? ?head = 0;
? ? ? ? }
? ? ? ? ?size--;
? ? ? ? ?return e;
? ? ? }
? ? }
? }
}
?
?@Override
?public synchronized int size() {
? ?return size;
}
?
?@Override
?public synchronized boolean isEmpty() {
? ?return size == 0;
}
}我們通過使用自旋來等待隊列滿足(為空,為滿)的條件。
需要注意的是,當(dāng)自旋檢測到隊列不滿足條件之后,為了保證后續(xù)操作線程安全,我們需要對其進行加鎖。
在加鎖之后,我們需要再次檢查隊列是否滿足條件(為空?為滿)。這有點類似線程安全單例類中的雙重檢測。這樣做的原因是,多個線程有可能同時執(zhí)行put()/take()函數(shù),并且同時檢測到隊列不滿足條件,于是,它們依次獲取鎖然后從隊列中操作數(shù)據(jù),如果不在獲取鎖之后重新檢測,那么就有可能導(dǎo)致數(shù)組訪問越界或者其他未知問題。
當(dāng)然,我們也無法將自旋邏輯放如synchronized代碼塊中,如果這樣做的話,那么可能會導(dǎo)致死鎖。
條件變量實現(xiàn)BlockingQueue
自旋并不會讓線程進入阻塞狀態(tài)。如果線程將一直執(zhí)行while循環(huán),白白浪費CPU資源,甚至?xí)孋PU使用率達到100%。
為了減少對CPU資源的浪費,我們可以在while循環(huán)中調(diào)用sleep()函數(shù),讓線程睡眠一小段時間。但這樣會導(dǎo)致性能下降,如果sleep一段時間,不能立刻獲取到隊列的狀態(tài),導(dǎo)致響應(yīng)不及時。
所以我們需要另外一套方案來實現(xiàn)阻塞隊列,那就是通過條件變量來解決浪費CPU資源和響應(yīng)不及時這兩個問題。
java對于條件變量的實現(xiàn)有兩種:
- Object.wait()/Object.notify()/Object.notifyAll()
- ReentrantLock.Condition
Object.wait()/notifyAll()/notify()
首先java內(nèi)置的條件變量,是使用Object類上的wait/notify/notifyAll方法實現(xiàn)的。
public class Object {
?public final void wait() throws InterruptedException;
?public final native void wait(long timeoutMillis) throws InterruptedException;
?public final void wait(long timeoutMillis, int nanos) throws InterruptedException
?public final native void notify();
?public final native void notifyAll();
}說明:
- 線程調(diào)用
wait(),線程狀態(tài)會進入WAITING狀態(tài)。 - 線程調(diào)用
wait(long timeoutMillis),線程狀態(tài)會進入TIMED_WAITING狀態(tài),等待時間超過了預(yù)設(shè)的超時時間。 - 其余線程調(diào)用
notify()/notifyAll()喚醒此線程。 - 線程被中斷,調(diào)用
wait()/wait(long timeout)會拋出InterruptedException異常。
public class BlockingQueueWithSync implements Queue {
?
?private int[] elements;
?private int head;
?private int tail;
?private volatile int size; // 隊列元素個數(shù)
?
?public BlockingQueueWithSync() {
? ?this(10);
}
?
?public BlockingQueueWithSync(int capacity) {
? ?this.elements = new int[capacity];
? ?this.head = 0;
? ?this.tail = 0;
? ?this.size = 0;
}
?
?@Override
?public synchronized void put(int e) throws InterruptedException {
? ?// 當(dāng)隊列滿的時候阻塞
? ?while (size == elements.length) {
? ? ?this.wait();
? }
? ?elements[tail] = e;
? ?tail++;
? ?if (tail == elements.length) {
? ? ?tail = 0;
? }
? ?size++;
? ?// 通知其他線程有數(shù)據(jù)了
? ?this.notifyAll();
}
?
?@Override
?public synchronized int take() throws InterruptedException {
? ?// 當(dāng)隊列空的時候阻塞
? ?while (isEmpty()) {
? ? ?this.wait();
? }
? ?int e = elements[head];
? ?if (++head == elements.length) {
? ? ?head = 0;
? }
? ?--size;
? ?// 通知其他線程,暫無數(shù)據(jù)
? ?this.notifyAll();
? ?return e;
}
?
?@Override
?public synchronized int size() {
? ?return size;
}
?
?@Override
?public synchronized boolean isEmpty() {
? ?return size == 0;
}
}如圖所示,wait()和notify()的工作流程如下:

需要注意的是:
wait()和notify()都是Object類里的方法,原則上是可以單獨調(diào)用的,但是會配合狀態(tài)聯(lián)合調(diào)用- 在調(diào)用
wait()和notify()的時候,需要加鎖,因為狀態(tài)的檢查和業(yè)務(wù)邏輯執(zhí)行構(gòu)成一組復(fù)合操作,如果不加鎖就會出現(xiàn)線程不安全的問題。 - 當(dāng)狀態(tài)不滿足條件的時候,會調(diào)用wait方法,進入等待隊列等待被喚醒,此時需要釋放鎖,否則其他線程將無法獲取到鎖,也就無法更新狀態(tài)。
- 當(dāng)?shù)却械木€程被喚醒時,必須再次競爭獲取到鎖的機會,需要再次檢查狀態(tài)是否滿足條件。
- while循環(huán)是為了避免線程被假喚醒。
wait()和notify()實現(xiàn)原理如下:
// ObjectMonitor.cpp
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {}
void ObjectMonitor::notify(TRAPS) {}
void ObjectMonitor::notifyAll(TRAPS) {}當(dāng)某個線程調(diào)用wait()函數(shù)時,線程會將自己放入_WaitSet中,并釋放持有的鎖,調(diào)用park()阻塞自己。
當(dāng)其他線程調(diào)用notify()
- 如果
_EntryList或者_cxq不為空時,那么它會從_WaitSet取出一個線程放入_EntryList中,讓其排隊等待鎖。 - 如果
_EntryList或者_cxq均為空時,那么它會從_WaitSet取出一個線程直接調(diào)用這個線程的unpark()方法取消其阻塞狀態(tài),讓其去競爭鎖。
當(dāng)調(diào)用了wait()的線程再次獲取到鎖的時候,會從wait()中返回,繼續(xù)檢查狀態(tài)是否滿足條件,如果不滿足則繼續(xù)執(zhí)行上述兩步,如果滿足了,則執(zhí)行業(yè)務(wù)邏輯。
notify()和notifyAll()的區(qū)別在于notifyAll()會將_WaitSet中所有線程取出來放入_EntryList中,讓他們一起競爭鎖。
ReentrantLock+Condition
public class BlockingQueueWithCondition implements Queue {
?
?private int[] elements;
?private int head;
?private int tail;
?private volatile int size; // 隊列元素個數(shù)
?private final ReentrantLock lock = new ReentrantLock();
?private final Condition notEmpty = lock.newCondition();
?private final Condition notFull = lock.newCondition();
?
?public BlockingQueueWithCondition() {
? ?this(10);
}
?
?public BlockingQueueWithCondition(int capacity) {
? ?this.elements = new int[capacity];
? ?this.head = 0;
? ?this.tail = 0;
? ?this.size = 0;
}
?
?@Override
?public void put(int e) throws InterruptedException {
? ?lock.lockInterruptibly();
? ?try {
? ? ?while (size == elements.length) {
? ? ? ?notFull.await();
? ? }
? ? ?elements[tail] = e;
? ? ?tail++;
? ? ?if (tail == elements.length) {
? ? ? ?tail = 0;
? ? }
? ? ?size++;
? ? ?notEmpty.signalAll();
? } finally {
? ? ?lock.unlock();
? }
}
?
?@Override
?public int take() throws InterruptedException {
? ?lock.lockInterruptibly();
? ?try {
? ? ?while (isEmpty()) {
? ? ? ?notEmpty.await();
? ? }
? ? ?int e = elements[head];
? ? ?if (++head == elements.length) {
? ? ? ?head = 0;
? ? }
? ? ?--size;
? ? ?notFull.signalAll();
? ? ?return e;
? } finally {
? ? ?lock.unlock();
? }
}
?
?@Override
?public int size() {
? ?try {
? ? ?lock.lock();
? ? ?return size;
? } finally {
? ? ?lock.unlock();
? }
}
?
?@Override
?public boolean isEmpty() {
? ?try {
? ? ?lock.lock();
? ? ?return size == 0;
? } finally {
? ? ?lock.unlock();
? }
}
}Condition是java SDK里提供的一種條件變量實現(xiàn)方式,其原理與Object#wait()以及Object#notify()類似
// java.util.concurrent.locks.Condition
public interface Condition {
? ?void await() throws InterruptedException;
? ?void awaitUninterruptibly();
? ?long awaitNanos(long nanosTimeout) throws InterruptedException;
? ?boolean await(long time, TimeUnit unit) throws InterruptedException;
? ?boolean awaitUntil(Date deadline) throws InterruptedException;
? ?void signal();
? ?void signalAll();
}Condition里的awaitXXX(XXX)方法基本等同于Object#wait(),但是比Object#wait()提供了更多的等待形式。例如:
Condition#awaitUninterruptibly(),表示此方法執(zhí)行中,不可以被中斷。Condition#awaitNanos(long nanosTimeout),表示等待超過nanosTimeout納秒時,函數(shù)返回,返回值為等待時間。Condition#await(long time, TimeUnit unit),跟awaitNanos類似。Condition#awaitUntil(Date deadline),表示等待到某個時間點deadline,函數(shù)返回,返回值如果為false則表示已經(jīng)超時,返回值如果為true,則表示線程被中斷或者被喚醒。
Condition里的signalXXX()方法基本等同于Object#notify()/notifyAll()。
Condition實現(xiàn)原理如下:
Condition是一個接口,其實現(xiàn)類為java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject,是AQS的一個內(nèi)部類,側(cè)面說明條件變量是需要相關(guān)的鎖操作的。
public class ConditionObject implements Condition, java.io.Serializable {
? ?private static final long serialVersionUID = 1173984872572414699L;
? ?/** First node of condition queue. */
? ?private transient Node firstWaiter;
? ?/** Last node of condition queue. */
? ?private transient Node lastWaiter;
? ?// ...
}
?
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter; // 用于Condition
}通過 firstWaiter 和 lastWaiter 構(gòu)建的隊列稱為等待隊列,用來存儲調(diào)用了await()函數(shù)的線程
AQS也包含一個隊列,通過head和tail構(gòu)建的同步隊列,用于存儲等待鎖的線程。
一個 Node 可以同時加入等待隊列和同步隊列。

如上圖所示,Lock中的同步隊列是雙向鏈表,由于雙向鏈表的操作復(fù)雜性,增加虛擬頭節(jié)點可以有效簡化操作。Condition中的等待隊列是單向鏈表,就沒有必要增加虛擬頭節(jié)點的必要了。
await()源代碼如下:
public final void await() throws InterruptedException {
? ?// 檢測到中斷,拋異常
? ?if (Thread.interrupted())
? ? ? ?throw new InterruptedException();
// 將線程包裹為Node添加到Condition等待隊列尾部
? ?Node node = addConditionWaiter();
? ?// 將state修改為0,返回釋放前鎖的狀態(tài)
? ?int savedState = fullyRelease(node);
? ?int interruptMode = 0;
? ?while (!isOnSyncQueue(node)) { // 被意外喚醒的話需要再次掛起
? ? ? ?LockSupport.park(this);
? ? ? ?if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
? ? ? ? ? ?break;
? }
? ?// 接收到 signal,返回前需要再排隊等待鎖
? ?if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
? ? ? ?interruptMode = REINTERRUPT;
? ?if (node.nextWaiter != null) // clean up if cancelled
? ? ? ?unlinkCancelledWaiters();
? ?if (interruptMode != 0)
? ? ? ?reportInterruptAfterWait(interruptMode);
}
?
// 加入條件等待隊列
private Node addConditionWaiter() {
? ?Node t = lastWaiter;
? ?// If lastWaiter is cancelled, clean out.
? ?if (t != null && t.waitStatus != Node.CONDITION) {
? ? ? ?unlinkCancelledWaiters();
? ? ? ?t = lastWaiter;
? }
?
? ?// 加入鏈表末尾
? ?Node node = new Node(Thread.currentThread(), Node.CONDITION);
? ?if (t == null)
? ? ? ?firstWaiter = node;
? ?else
? ? ? ?t.nextWaiter = node;
? ?lastWaiter = node;
? ?return node;
}
?
?
// 意外喚醒
final boolean isOnSyncQueue(Node node) {
? ?// 進入同步隊列時,waitStatus為0,且prev指向前驅(qū)節(jié)點
? ?// 之后節(jié)點可能被取消,狀態(tài)變?yōu)镃ANCELLED
? ?if (node.waitStatus == Node.CONDITION || node.prev == null)
? ? ? ?return false;
? ?// 存在后繼節(jié)點,肯定在同步隊列中
? ?if (node.next != null)
? ? ? ?return true;
? ?// 兜底,從tail查找,確保node已經(jīng)被加入同步隊列
? ?return findNodeFromTail(node);
}
?signal()源代碼如下:
public final void signal() {
?// 必須保證持有鎖
?if (!isHeldExclusively())
? ?throw new IllegalMonitorStateException();
?Node first = firstWaiter;
?if (first != null)
? ?// 喚醒隊首線程
? ?doSignal(first);
}
?
private void doSignal(Node first) {
?do {
? ?// 將first移出隊列
? ?if ( (firstWaiter = first.nextWaiter) == null)
? ? ?lastWaiter = null;
? ?first.nextWaiter = null;
} while (!transferForSignal(first) && ?// 喚醒線程
? ? ? ? ? (first = firstWaiter) != null);
}
?
final boolean transferForSignal(Node node) {
?// 節(jié)點狀態(tài)不為CONDITION,說明已經(jīng)被取消了,不進行喚醒
?if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
? ?return false;
?// 將節(jié)點加入到同步隊列,返回之前的隊尾節(jié)點
?Node p = enq(node);
?int ws = p.waitStatus;
?// 如果設(shè)置前驅(qū)節(jié)點的狀態(tài)失敗(如前驅(qū)已被取消)則直接喚醒線程
?// 喚醒后的線程會在 `await` 中執(zhí)行 `acquireQueued` 直到搶鎖成功
?if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
? ?LockSupport.unpark(node.thread);
?return true;
}以上就是詳解Java如何實現(xiàn)一個BlockingQueue的詳細(xì)內(nèi)容,更多關(guān)于Java BlockingQueue的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于SpringBoot集成測試遠(yuǎn)程連接Redis服務(wù)的教程詳解
這篇文章主要介紹了基于SpringBoot集成測試遠(yuǎn)程連接的Redis服務(wù)的相關(guān)知識,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-03-03
spring框架配置實體類復(fù)雜屬性注入xml文件過程詳解
這篇文章主要介紹了spring框架配置實體類復(fù)雜屬性注入xml文件過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-09-09
Redis使用RedisTemplate模板類的常用操作方式
這篇文章主要介紹了Redis使用RedisTemplate模板類的常用操作方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
java之scan.next()與scan.nextline()函數(shù)的使用及區(qū)別
這篇文章主要介紹了java之scan.next()與scan.nextline()函數(shù)的使用及區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04

