詳解Java中的阻塞隊(duì)列
什么是阻塞隊(duì)列
在數(shù)據(jù)結(jié)構(gòu)中,隊(duì)列遵循FIFO(先進(jìn)先出)原則。在java中,Queue接口定義了定義了基本行為,由子類完成實(shí)現(xiàn),常見的隊(duì)列有ArrayDeque、LinkedList等,這些都是非線程安全的,在java 1.5中新增了阻塞隊(duì)列,當(dāng)隊(duì)列滿時(shí),添加元素的線程呈阻塞狀態(tài);當(dāng)隊(duì)列為空時(shí),獲取元素的線程呈阻塞狀態(tài)。
生產(chǎn)者、消費(fèi)者模型

生產(chǎn)者將元素添加到隊(duì)列中,消費(fèi)中獲取數(shù)據(jù)后完成數(shù)據(jù)處理。兩者通過隊(duì)列解決了生產(chǎn)者和消費(fèi)者的耦合關(guān)系;當(dāng)生產(chǎn)者的生產(chǎn)速度與消費(fèi)者的消費(fèi)速度不一致時(shí),可以通過大道緩沖的目的。
阻塞隊(duì)列的使用場(chǎng)景
線程池
在線程池中,當(dāng)工作線程數(shù)大于等于corePoolSize時(shí),后續(xù)的任務(wù)后添加到阻塞隊(duì)列中;
目前有那些阻塞隊(duì)列
在java中,BlockingQueue接口定義了阻塞隊(duì)列的行為,常用子類是ArrayBlockingQueue和LinkedBlockingQueue。

BlockingQueue繼承了Queue接口,擁有其全部特性。在BlockingQueue的java doc中對(duì)其中的操作方法做了匯總

插入元素
- add(e):當(dāng)隊(duì)列已滿時(shí),再添加元素會(huì)拋出異常
IllegalStateException - offer(e):添加成功,返回true,否則返回false
- put:(e):當(dāng)隊(duì)列已滿時(shí),再添加元素會(huì)使線程變?yōu)樽枞麪顟B(tài)
- offer(e, time,unit):當(dāng)隊(duì)列已滿時(shí),在末尾添加數(shù)據(jù),如果在指定時(shí)間內(nèi)沒有添加成功,返回false,反之是true
刪除元素
- remove(e):返回true表示已成功刪除,否則返回false
- poll():如果隊(duì)列為空返回null,否則返回隊(duì)列中的第一個(gè)元素
- take():獲取隊(duì)列中的第一個(gè)元素,如果隊(duì)列為空,獲取元素的線程變?yōu)樽枞麪顟B(tài)
- poll(time, unit):當(dāng)隊(duì)列為空時(shí),線程被阻塞,如果超過指定時(shí)間,線程退出
檢查元素
- element():獲取隊(duì)頭元素,如果元素為null,拋出
NoSuchElementException - peek():獲取隊(duì)頭元素,如果隊(duì)列為空返回null,否則返回目標(biāo)元素
ArrayBlockingQueue
底層基于數(shù)組的有界阻塞隊(duì)列,在構(gòu)造此隊(duì)列時(shí)必須指定容量;
構(gòu)造函數(shù)
// 第一個(gè)
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
// 第二個(gè)
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// 第三個(gè)
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
- capacity:隊(duì)列的初始容量
- fair:線程訪問隊(duì)列的公平性。如果為true按照FIFO的原則處理,反之;默認(rèn)為falsec:
- 已有元素的集合,類型于合并兩個(gè)數(shù)組
put()方法
public void put(E e) throws InterruptedException {
// 檢查元素是否為null
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 獲取鎖
lock.lockInterruptibly();
try {
// 如果當(dāng)前隊(duì)列為空,變?yōu)樽枞麪顟B(tài)
while (count == items.length)
notFull.await();
// 反之,就添加元素
enqueue(e);
} finally {
// 解鎖
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 此時(shí)隊(duì)列不為空,喚醒消費(fèi)者
notEmpty.signal();
}
take()方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 獲取鎖
lock.lockInterruptibly();
try {
// 如果隊(duì)列為空,消費(fèi)者變?yōu)樽枞麪顟B(tài)
while (count == 0)
notEmpty.await();
// 不為空,就獲取數(shù)據(jù)
return dequeue();
} finally {
// 解鎖
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 獲取隊(duì)頭元素x
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 此時(shí)隊(duì)列沒有滿,同時(shí)生產(chǎn)者繼續(xù)添加數(shù)據(jù)
notFull.signal();
return x;
}
LinkedBlockingQueue
底層基于單向鏈表的無界阻塞隊(duì)列,如果不指定初始容量,默認(rèn)為Integer.MAX_VALUE,否則為指定容量
構(gòu)造函數(shù)
// 不指定容量
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 指定容量
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 等同于合并數(shù)組
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
put()方法
public void put(E e) throws InterruptedException {
// 元素為空,拋出異常
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 獲取隊(duì)列中的數(shù)據(jù)量
final AtomicInteger count = this.count;
// 獲取鎖
putLock.lockInterruptibly();
try {
// 隊(duì)列滿了,變?yōu)樽枞麪顟B(tài)
while (count.get() == capacity) {
notFull.await();
}
// 將目標(biāo)元素添加到鏈表的尾端
enqueue(node);
// 總數(shù)增加
c = count.getAndIncrement();
// 隊(duì)列還沒有滿,繼續(xù)添加元素
if (c + 1 < capacity)
notFull.signal();
} finally {
// 解鎖
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
take()方法
public E take() throws InterruptedException {
E x;
int c = -1;
// 獲取隊(duì)列中的工作數(shù)
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 獲取鎖
takeLock.lockInterruptibly();
try {
// 如果隊(duì)列為空,變?yōu)樽枞麪顟B(tài)
while (count.get() == 0) {
notEmpty.await();
}
// 獲取隊(duì)頭元素
x = dequeue();
// 遞減
c = count.getAndDecrement();
// 通知消費(fèi)者
if (c > 1)
notEmpty.signal();
} finally {
// 解鎖
takeLock.unlock();
}
if (c == capacity)
//
signalNotFull();
return x;
}
對(duì)比
相同點(diǎn)
- 兩者都是通過Condition通知生產(chǎn)者和消費(fèi)者完成元素的添加和獲取
- 都可以指定容量
不同點(diǎn)
-
ArrayBlockingQueue基于數(shù)據(jù),LinkedBlockingQueue基于鏈表 ArrayBlockingQueue內(nèi)有一把鎖,LinkedBlockingQueue內(nèi)有兩把鎖

自己動(dòng)手實(shí)現(xiàn)一個(gè)阻塞隊(duì)列
通過分析源碼可以知道,阻塞隊(duì)列其實(shí)是通過通知機(jī)制Condition完成生產(chǎn)者和消費(fèi)的互通。也可以通過Object類中的wait()和notify、notifyAll實(shí)現(xiàn)。下面是自己寫的一個(gè)阻塞隊(duì)列
public class BlockQueue {
// 對(duì)象鎖
public static final Object LOCK = new Object();
// 控制變量的值 來通知雙方
public boolean condition;
public void put() {
synchronized (LOCK) {
while (condition) {
try {
// 滿了
System.out.println("put 隊(duì)列滿了,開始阻塞");
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
condition = true;
System.out.println("put 改為true,喚醒消費(fèi)者");
LOCK.notifyAll();
}
}
public void take() {
synchronized (LOCK) {
while (!condition) {
// 沒滿
System.out.println("take 隊(duì)列沒滿,開始阻塞");
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
condition = false;
System.out.println("take 改為false,喚醒生產(chǎn)者");
LOCK.notifyAll();
}
}
}
參考文章:
并發(fā)容器之BlockingQueue (juejin.cn)
BlockingQueue (Java Platform SE 8 ) (oracle.com)
到此這篇關(guān)于詳解Java中的阻塞隊(duì)列的文章就介紹到這了,更多相關(guān)Java阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java?Thread.currentThread().getName()?和?this.getName()區(qū)別詳
本文主要介紹了Thread.currentThread().getName()?和?this.getName()區(qū)別詳解,TestThread?testThread?=?new?TestThread();2022-02-02
SpringBoot @ModelAttribute使用場(chǎng)景分析
這篇文章主要介紹了SpringBoot @ModelAttribute使用場(chǎng)景分析,文中通過實(shí)例代碼圖文相結(jié)合給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-08-08
Mybatis-Plus中update()和updateById()將字段更新為null
本文主要介紹了Mybatis-Plus中update()和updateById()將字段更新為null,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08
java中同類對(duì)象之間的compareTo()和compare()方法對(duì)比分析
這篇文章主要介紹了java中同類對(duì)象之間的compareTo()和compare()方法對(duì)比分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09
SpringMVC使用hibernate-validator進(jìn)行參數(shù)校驗(yàn)最佳實(shí)踐記錄
這篇文章主要介紹了SpringMVC使用hibernate-validator進(jìn)行參數(shù)校驗(yàn)最佳實(shí)踐,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-05-05
java8實(shí)現(xiàn)List中對(duì)象屬性的去重方法
這篇文章主要介紹了java8實(shí)現(xiàn)List中對(duì)象屬性的去重方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
Spring中的注解@Autowired實(shí)現(xiàn)過程全解(@Autowired 背后的故事)
這篇文章主要介紹了Spring中的注解@Autowired實(shí)現(xiàn)過程全解,給大家聊聊@Autowired 背后的故事及實(shí)現(xiàn)原理,需要的朋友可以參考下2021-07-07
Java基于Socket實(shí)現(xiàn)多人聊天室
這篇文章主要為大家詳細(xì)介紹了Java基于Socket實(shí)現(xiàn)多人聊天室,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-09-09

