利用Java手寫阻塞隊列的示例代碼
前言
在我們平時編程的時候一個很重要的工具就是容器,在本篇文章當(dāng)中主要給大家介紹阻塞隊列的原理,并且在了解原理之后自己動手實現(xiàn)一個低配版的阻塞隊列。
需求分析
在前面的兩篇文章ArrayDeque(JDK雙端隊列)源碼深度剖析和深入剖析(JDK)ArrayQueue源碼當(dāng)中我們仔細介紹了隊列的原理,如果大家感興趣可以查看一下!
而在本篇文章所談到的阻塞隊列當(dāng)中,是在并發(fā)的情況下使用的,上面所談到的是隊列是并發(fā)不安全的,但是阻塞隊列在并發(fā)下情況是安全的。阻塞隊列的主要的需求如下:
- 隊列基礎(chǔ)的功能需要有,往隊列當(dāng)中放數(shù)據(jù),從隊列當(dāng)中取數(shù)據(jù)。
- 所有的隊列操作都要是并發(fā)安全的。
- 當(dāng)隊列滿了之后再往隊列當(dāng)中放數(shù)據(jù)的時候,線程需要被掛起,當(dāng)隊列當(dāng)中的數(shù)據(jù)被取出,讓隊列當(dāng)中有空間的時候線程需要被喚醒。
- 當(dāng)隊列空了之后再往隊列當(dāng)中取數(shù)據(jù)的時候,線程需要被掛起,當(dāng)有線程往隊列當(dāng)中加入數(shù)據(jù)的時候被掛起的線程需要被喚醒。
- 在我們實現(xiàn)的隊列當(dāng)中我們使用數(shù)組去存儲數(shù)據(jù),因此在構(gòu)造函數(shù)當(dāng)中需要提供數(shù)組的初始大小,設(shè)置用多大的數(shù)組。
阻塞隊列實現(xiàn)原理
線程阻塞和喚醒
在上面我們已經(jīng)談到了阻塞隊列是并發(fā)安全的,而且我們還有將線程喚醒和阻塞的需求,因此我們可以選擇可重入鎖ReentrantLock保證并發(fā)安全,但是我們還需要將線程喚醒和阻塞,因此我們可以選擇條件變量Condition進行線程的喚醒和阻塞操作,在Condition當(dāng)中我們將會使用到的,主要有以下兩個函數(shù):
signal用于喚醒線程,當(dāng)一個線程調(diào)用Condition的signal函數(shù)的時候就可以喚醒一個被await函數(shù)阻塞的線程。await用于阻塞線程,當(dāng)一個線程調(diào)用Condition的await函數(shù)的時候這個線程就會阻塞。
數(shù)組循環(huán)使用
因為隊列是一端進一端出,因此隊列肯定有頭有尾。

當(dāng)我們往隊列當(dāng)中加入一些數(shù)據(jù)之后,隊列的情況可能如下:

在上圖的基礎(chǔ)之上我們在進行四次出隊操作,結(jié)果如下:

在上面的狀態(tài)下,我們繼續(xù)加入8個數(shù)據(jù),那么布局情況如下:

我們知道上圖在加入數(shù)據(jù)的時候不僅將數(shù)組后半部分的空間使用完了,而且可以繼續(xù)使用前半部分沒有使用過的空間,也就是說在隊列內(nèi)部實現(xiàn)了一個循環(huán)使用的過程。
為了保證數(shù)組的循環(huán)使用,我們需要用一個變量記錄隊列頭在數(shù)組當(dāng)中的位置,用一個變量記錄隊列尾部在數(shù)組當(dāng)中的位置,還需要有一個變量記錄隊列當(dāng)中有多少個數(shù)據(jù)。
代碼實現(xiàn)
成員變量定義
根據(jù)上面的分析我們可以知道,在我們自己實現(xiàn)的類當(dāng)中我們需要有如下的類成員變量:
// 用于保護臨界區(qū)的鎖 private final ReentrantLock lock; // 用于喚醒取數(shù)據(jù)的時候被阻塞的線程 private final Condition notEmpty; // 用于喚醒放數(shù)據(jù)的時候被阻塞的線程 private final Condition notFull; // 用于記錄從數(shù)組當(dāng)中取數(shù)據(jù)的位置 也就是隊列頭部的位置 private int takeIndex; // 用于記錄從數(shù)組當(dāng)中放數(shù)據(jù)的位置 也就是隊列尾部的位置 private int putIndex; // 記錄隊列當(dāng)中有多少個數(shù)據(jù) private int count; // 用于存放具體數(shù)據(jù)的數(shù)組 private Object[] items;
構(gòu)造函數(shù)
我們的構(gòu)造函數(shù)也很簡單,最核心的就是傳入一個數(shù)組大小的參數(shù),并且給上面的變量進行初始化賦值。
@SuppressWarnings("unchecked")
public MyArrayBlockingQueue(int size) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
// 其實可以不用初始化 類會有默認初始化 默認初始化為0
takeIndex = 0;
putIndex = 0;
count = 0;
// 數(shù)組的長度肯定不能夠小于0
if (size <= 0)
throw new RuntimeException("size can not be less than 1");
items = (E[])new Object[size];
}put函數(shù)
這是一個比較重要的函數(shù)了,在這個函數(shù)當(dāng)中如果隊列沒有滿,則直接將數(shù)據(jù)放入到數(shù)組當(dāng)中即可,如果數(shù)組滿了,則需要將線程掛起。
public void put(E x){
// put 函數(shù)可能多個線程調(diào)用 但是我們需要保證在給變量賦值的時候只能夠有一個線程
// 因為如果多個線程同時進行賦值的話 那么可能后一個線程的賦值操作覆蓋了前一個線程的賦值操作
// 因此這里需要上鎖
lock.lock();
try {
// 如果隊列當(dāng)中的數(shù)據(jù)個數(shù)等于數(shù)組的長度的話 說明數(shù)組已經(jīng)滿了
// 這個時候需要將線程掛起
while (count == items.length)
notFull.await(); // 將調(diào)用 await的線程掛起
// 當(dāng)數(shù)組沒有滿 或者在掛起之后再次喚醒的話說明數(shù)組當(dāng)中有空間了
// 這個時候需要將數(shù)組入隊
// 調(diào)用入隊函數(shù)將數(shù)據(jù)入隊
enqueue(x);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 解鎖
lock.unlock();
}
}
// 將數(shù)據(jù)入隊
private void enqueue(E x) {
this.items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 喚醒一個被 take 函數(shù)阻塞的線程喚醒
}offer函數(shù)
offer函數(shù)和put函數(shù)一樣,但是與put函數(shù)不同的是,當(dāng)數(shù)組當(dāng)中數(shù)據(jù)填滿之后offer函數(shù)返回false,而不是被阻塞。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果數(shù)組滿了 則直接返回false 而不是被阻塞
if (count == items.length)
return false;
else {
// 如果數(shù)組沒有滿則直接入隊 并且返回 true
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}add函數(shù)
這個函數(shù)和上面兩個函數(shù)作用一樣,也是往隊列當(dāng)中加入數(shù)據(jù),但當(dāng)單隊列滿了之后這個函數(shù)會拋出異常。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new RuntimeException("Queue full");
}take函數(shù)
這個函數(shù)主要是從隊列當(dāng)中取出一個數(shù)據(jù),但是當(dāng)隊列為空的時候,這個函數(shù)會阻塞調(diào)用該函數(shù)的線程:
public E take() throws InterruptedException {
// 這個函數(shù)也是不能夠并發(fā)的 否則可能不同的線程取出的是同一個位置的數(shù)據(jù)
// 進行加鎖操作
lock.lock();
try {
// 當(dāng) count 等于0 說明隊列為空
// 需要將線程掛起等待
while (count == 0)
notEmpty.await();
// 當(dāng)被喚醒之后進行出隊操作
return dequeue();
}finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; // 將對應(yīng)的位置設(shè)置為 null GC就可以回收了
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 隊列當(dāng)中數(shù)據(jù)少一個了
// 因為出隊了一個數(shù)據(jù) 可以喚醒一個被 put 函數(shù)阻塞的線程 如果這個時候沒有被阻塞的線程
// 這個函數(shù)就不會起作用 也就說在這個函數(shù)調(diào)用之后被 put 函數(shù)掛起的線程也不會被喚醒
notFull.signal(); // 喚醒一個被 put 函數(shù)阻塞的線程
return x;
}重寫toString函數(shù)
因為我們在后面的測試函數(shù)當(dāng)中會打印我們這個類,而打印這個類的時候會調(diào)用對象的toString方法得到一個字符串,最后打印這個字符串。
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("[");
// 這里需要上鎖 因為我們在打印的時候需要打印所有的數(shù)據(jù)
// 打印所有的數(shù)據(jù)就需要對數(shù)組進行遍歷操作 而在進行遍歷
// 操作的時候是不能進行插入和刪除操作的 因為打印的是某
// 個時刻的數(shù)據(jù)
lock.lock();
try {
if (count == 0)
stringBuilder.append("]");
else {
int cur = 0;
// 對數(shù)據(jù)進行遍歷 一共遍歷 count 次 因為數(shù)組當(dāng)中一共有 count
// 個數(shù)據(jù)
while (cur != count) {
// 從 takeIndex 位置開始進行遍歷 因為數(shù)據(jù)是從這個位置開始的
stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", ");
cur += 1;
}
// 刪除掉最后一次沒用的 ", "
stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
stringBuilder.append(']');
}
}finally {
lock.unlock();
}
return stringBuilder.toString();
}完整代碼
整個我們自己完成的阻塞隊列的代碼如下:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyArrayBlockingQueue<E> {
// 用于保護臨界區(qū)的鎖
private final ReentrantLock lock;
// 用于喚醒取數(shù)據(jù)的時候被阻塞的線程
private final Condition notEmpty;
// 用于喚醒放數(shù)據(jù)的時候被阻塞的線程
private final Condition notFull;
// 用于記錄從數(shù)組當(dāng)中取數(shù)據(jù)的位置 也就是隊列頭部的位置
private int takeIndex;
// 用于記錄從數(shù)組當(dāng)中放數(shù)據(jù)的位置 也就是隊列尾部的位置
private int putIndex;
// 記錄隊列當(dāng)中有多少個數(shù)據(jù)
private int count;
// 用于存放具體數(shù)據(jù)的數(shù)組
private Object[] items;
@SuppressWarnings("unchecked")
public MyArrayBlockingQueue(int size) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
// 其實可以不用初始化 類會有默認初始化 默認初始化為0
takeIndex = 0;
putIndex = 0;
count = 0;
if (size <= 0)
throw new RuntimeException("size can not be less than 1");
items = (E[])new Object[size];
}
public void put(E x){
lock.lock();
try {
while (count == items.length)
notFull.await();
enqueue(x);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
this.items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
notFull.signal();
return x;
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new RuntimeException("Queue full");
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
return dequeue();
}finally {
lock.unlock();
}
}
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("[");
lock.lock();
try {
if (count == 0)
stringBuilder.append("]");
else {
int cur = 0;
while (cur != count) {
stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", ");
cur += 1;
}
stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
stringBuilder.append(']');
}
}finally {
lock.unlock();
}
return stringBuilder.toString();
}
}現(xiàn)在對上面的代碼進行測試:
我們現(xiàn)在使用阻塞隊列模擬一個生產(chǎn)者消費者模型,設(shè)置阻塞隊列的大小為5,生產(chǎn)者線程會往隊列當(dāng)中加入數(shù)據(jù),數(shù)據(jù)為0-9的10個數(shù)字,消費者線程一共會消費10次。
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5);
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " 往隊列當(dāng)中加入數(shù)據(jù):" + i);
queue.put(i);
}
}, "生產(chǎn)者");
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println(Thread.currentThread().getName() + " 從隊列當(dāng)中取出數(shù)據(jù):" + queue.take());
System.out.println(Thread.currentThread().getName() + " 當(dāng)前隊列當(dāng)中的數(shù)據(jù):" + queue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消費者");
thread.start();
TimeUnit.SECONDS.sleep(3);
thread1.start();
}
}上面代碼的輸出如下所示:
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):0
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):1
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):2
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):3
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):4
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):5
消費者 從隊列當(dāng)中取出數(shù)據(jù):0
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):6
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[1, 2, 3, 4, 5]
消費者 從隊列當(dāng)中取出數(shù)據(jù):1
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[2, 3, 4, 5]
消費者 從隊列當(dāng)中取出數(shù)據(jù):2
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[3, 4, 5, 6]
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):7
消費者 從隊列當(dāng)中取出數(shù)據(jù):3
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[4, 5, 6, 7]
消費者 從隊列當(dāng)中取出數(shù)據(jù):4
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[5, 6, 7]
消費者 從隊列當(dāng)中取出數(shù)據(jù):5
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[6, 7]
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):8
消費者 從隊列當(dāng)中取出數(shù)據(jù):6
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[7, 8]
消費者 從隊列當(dāng)中取出數(shù)據(jù):7
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[8]
消費者 從隊列當(dāng)中取出數(shù)據(jù):8
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[]
生產(chǎn)者 往隊列當(dāng)中加入數(shù)據(jù):9
消費者 從隊列當(dāng)中取出數(shù)據(jù):9
消費者 當(dāng)前隊列當(dāng)中的數(shù)據(jù):[]
從上面的輸出結(jié)果我們知道,生產(chǎn)者線程打印5之后被掛起了,因為如果沒有被掛起,生產(chǎn)者線程肯定可以一次性輸出完成,因為消費者線程阻塞了3秒。但是他沒有輸出完成說明在打印5之后,因為阻塞隊列滿了,因而生產(chǎn)者線程被掛起了。然后消費者開始消費,這樣阻塞隊列當(dāng)中就有空間了,生產(chǎn)者線程就可以繼續(xù)生產(chǎn)了。
總結(jié)
在本篇文章當(dāng)中,主要向大家介紹了阻塞隊列的原理并且實現(xiàn)了一個低配版的數(shù)組阻塞隊列,其實如果你了解數(shù)組隊列和鎖的話,這個代碼實現(xiàn)起來還是相對比較簡單的,我們只需要使用鎖去保證我們的程序并發(fā)安全即可。
我們在實現(xiàn)put函數(shù)的時候,如果當(dāng)前隊列已經(jīng)滿了,則當(dāng)前線程需要調(diào)用await函數(shù)進行阻塞,當(dāng)線程被喚醒或者隊列沒有滿可以繼續(xù)執(zhí)行的時候,我們在往隊列當(dāng)中加入數(shù)據(jù)之后需要調(diào)用一次signal函數(shù),因為這樣可以喚醒在調(diào)用take函數(shù)的時候因為隊列空而阻塞的線程。
我們實現(xiàn)take函數(shù)的時候,如果當(dāng)前隊列已經(jīng)空了,則當(dāng)前線程也需要調(diào)用await函數(shù)進行阻塞,當(dāng)線程被喚醒或者隊列不為空線程可以繼續(xù)執(zhí)行,在出隊之后需要調(diào)用一次signal函數(shù),因為這樣可以喚醒在調(diào)用put函數(shù)的時候因為隊列滿而阻塞的線程。
以上就是利用Java手寫阻塞隊列的示例代碼的詳細內(nèi)容,更多關(guān)于Java阻塞隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java數(shù)據(jù)結(jié)構(gòu)之圖的兩種搜索算法詳解
在很多情況下,我們需要遍歷圖,得到圖的一些性質(zhì)。有關(guān)圖的搜索,最經(jīng)典的算法有深度優(yōu)先搜索和廣度優(yōu)先搜索,接下來我們分別講解這兩種搜索算法,需要的可以參考一下2022-11-11
Java編譯錯誤信息提示java.lang.ExceptionInInitializer解決
這篇文章主要介紹了Java編譯錯誤信息提示java.lang.ExceptionInInitializer的分析講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-07-07
java 進程是如何在Linux服務(wù)器上進行內(nèi)存分配的
這篇文章主要介紹了java 進程是如何在Linux服務(wù)器上進行內(nèi)存分配的,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-11-11
git stash 和unstash的使用操作,git unstash failed
這篇文章主要介紹了git stash 和unstash的使用操作,git unstash failed,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02

