詳細分析Java并發(fā)集合ArrayBlockingQueue的用法
在上一章中,我們介紹了阻塞隊列BlockingQueue,下面我們介紹它的常用實現(xiàn)類ArrayBlockingQueue。
一. 用數(shù)組來實現(xiàn)隊列
因為隊列這種數(shù)據(jù)結(jié)構(gòu)的特殊要求,所以它天然適合用鏈表的方式來實現(xiàn),用兩個變量分別記錄鏈表頭和鏈表尾,當刪除或插入隊列時,只要改變鏈表頭或鏈表尾就可以了,而且鏈表使用引用的方式鏈接的,所以它的容量幾乎是無限的。
那么怎么使用數(shù)組來實現(xiàn)隊列,我們需要四個變量:Object[] array來存儲隊列中元素,headIndex和tailIndex分別記錄隊列頭和隊列尾,count記錄隊列的個數(shù)。
- 因為數(shù)組的長度是固定,所以當count==array.length時,表示隊列已經(jīng)滿了,當count==0的時候,表示隊列是空的。
- 當添加元素的時候,將array[tailIndex] = e將tailIndex位置設(shè)置成新元素,之后將tailIndex++自增,然后將count++自增。但是有兩點需要注意,在添加之前必須先判斷隊列是否已滿,不然會出現(xiàn)覆蓋已有元素。當tailIndex的值等于數(shù)組最后一個位置的時候,需要將tailIndex=0,循環(huán)利用數(shù)組
- 當刪除元素的時候,將先記錄下array[headIndex] 元素,之后將headIndex++自增,然后將count--自減。但是有兩點需要注意要注意,在刪除之前,必須先判斷隊列是否為空,不然可能會刪除已刪除的元素。
這里用了一個很巧妙的方式,我們知道當向隊列中插入一個元素,那么就占用了數(shù)組的一個位置,當刪除一個元素的時候,那么其實數(shù)組的這個位置就空閑出來了,表示這個位置又可以插入新元素了。
所以我們插入新元素前,必須檢查隊列是否已滿,刪除元素之前,必須檢查隊列是否為空。
二. ArrayBlockingQueue中重要成員變量
/** 儲存隊列的中元素 */ final Object[] items; /** 隊列頭的位置 */ int takeIndex; /** 隊列尾的位置 */ int putIndex; /** 當前隊列擁有的元素個數(shù) */ int count; /** 用來保證多線程操作共享變量的安全問題 */ final ReentrantLock lock; /** 當隊列為空時,就會調(diào)用notEmpty的wait方法,讓當前線程等待 */ private final Condition notEmpty; /** 當隊列為滿時,就會調(diào)用notFull的wait方法,讓當前線程等待 */ private final Condition notFull;
就是多了lock、notEmpty、notFull變量來實現(xiàn)多線程安全和線程等待條件的,它們?nèi)齻€是怎么操作的呢?
2.1 lock的作用
因為ArrayBlockingQueue是在多線程下操作的,所以修改items、takeIndex、putIndex和count這些成員變量時,必須要考慮多線程安全問題,所以這里使用lock獨占鎖,來保證并發(fā)操作的安全。
2.2 notEmpty與notFull的作用
因為阻塞隊列必須實現(xiàn),當隊列為空或隊列已滿的時候,隊列的讀取或插入操作要等待。所以我們想到了并發(fā)框架下的Condition對象,使用它來控制。
在AQS中,我們介紹了這個類的作用:
- await系列方法,會釋放當前鎖,并讓當前線程等待。
- signal與signalAll方法,會喚醒當前線程。其實它并不是喚醒當前線程,而是將在這個Condition條件上等待的線程,添加到lock鎖上的等待線程池中,所以當鎖被釋放時,會喚醒lock鎖上的等待線程池中一個線程。具體在AQS中有源碼分析。
三. 添加元素方法
3.1 add(E e)與offer(E e)方法:
// 調(diào)用AbstractQueue父類中的方法。
public boolean add(E e) {
// 通過調(diào)用offer來時實現(xiàn)
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//向隊列末尾新添加元素。返回true表示添加成功,false表示添加失敗,不會拋出異常
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 使用lock來保證,多線程修改成員屬性的安全
lock.lock();
try {
// 隊列已滿,添加元素失敗,返回false。
if (count == items.length)
return false;
else {
// 調(diào)用enqueue方法將元素插入隊列中
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
add方法是調(diào)用offer方法實現(xiàn)的。在offer方法中,必須先判斷隊列是否已滿,如果已滿就直接返回false,而不會阻塞當前線程。如果不滿就調(diào)用enqueue方法將元素插入隊列中。
注意:這里使用lock.lock()是保證同一時間只有一個線程修改成員變量,防止出現(xiàn)并發(fā)操作問題。雖然它也會阻塞當前線程,但是它并不是條件等待,只是因為鎖被其他線程持有,而ArrayBlockingQueue中方法操作時間都不長,這里相當于不阻塞線程。
3.2 put方法
// 向隊列末尾新添加元素,如果隊列已滿,當前線程就等待。響應(yīng)中斷異常
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 使用lock來保證,多線程修改成員屬性的安全
lock.lockInterruptibly();
try {
// 隊列已滿,則調(diào)用notFull.await()方法,讓當前線程等待,直到隊列不是滿的
while (count == items.length)
notFull.await();
// 調(diào)用enqueue方法將元素插入隊列中
enqueue(e);
} finally {
lock.unlock();
}
}
與offer方法大體流程一樣,只是當隊列已滿的時候,會調(diào)用notFull.await()方法,讓當前線程阻塞等待,直到隊列被別的線程移除了元素,隊列不滿的時候,會喚醒這個等待線程。
3.3 offer(E e, long timeout, TimeUnit unit)方法
/**
* 向隊列末尾新添加元素,如果隊列中沒有可用空間,當前線程就等待,
* 如果等待時間超過timeout了,那么返回false,表示添加失敗
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
// 計算一共最多等待的時間值nanos
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 使用lock來保證,多線程修改成員屬性的安全
lock.lockInterruptibly();
try {
// 隊列已滿
while (count == items.length) {
// nanos <= 0表示最大等待時間已到,那么不用再等待了,返回false,表示添加失敗。
if (nanos <= 0)
return false;
// 調(diào)用notFull.awaitNanos(nanos)方法,超時nanos時間會被自動喚醒,
// 如果被提前喚醒,那么返回剩余的時間
nanos = notFull.awaitNanos(nanos);
}
// 調(diào)用enqueue方法將元素插入隊列中
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
與put的方法大體流程一樣,只不過是調(diào)用notFull.awaitNanos(nanos)方法,讓當前線程等待,并設(shè)置等待時間。
四. 刪除元素方法
4.1 remove()和poll()方法:
// 調(diào)用AbstractQueue父類中的方法。
public E remove() {
// 通過調(diào)用poll來時實現(xiàn)
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
// 刪除隊列第一個元素(即隊列頭),并返回它。如果隊列是空的,它不會拋出異常,而是會返回null。
public E poll() {
final ReentrantLock lock = this.lock;
// 使用lock來保證,多線程修改成員屬性的安全
lock.lock();
try {
// 如果count == 0,列表為空,就返回null,否則調(diào)用dequeue方法,返回列表頭元素
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
remove方法是調(diào)用poll()方法實現(xiàn)的。在 poll()方法中,如果列表為空,就返回null,否則調(diào)用dequeue方法,返回列表頭元素。
4.2 take()方法
/**
* 返回并移除隊列第一個元素,如果隊列是空的,就前線程就等待。響應(yīng)中斷異常
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 使用lock來保證,多線程修改成員屬性的安全
lock.lockInterruptibly();
try {
// 如果隊列為空,就調(diào)用notEmpty.await()方法,讓當前線程等待。
// 直到有別的線程向隊列中插入元素,那么這個線程會被喚醒。
while (count == 0)
notEmpty.await();
// 調(diào)用dequeue方法,返回列表頭元素
return dequeue();
} finally {
lock.unlock();
}
}
take()方法當隊列為空的時候,當前線程必須等待,直到有別的線程向隊列中插入新元素,那么這個線程會被喚醒。
4.3 poll(long timeout, TimeUnit unit)方法
/**
* 返回并移除隊列第一個元素,如果隊列是空的,就前線程就等待。
* 如果等待時間超過timeout了,那么返回false,表示獲取元素失敗
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 計算一共最多等待的時間值nanos
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 使用lock來保證,多線程修改成員屬性的安全
lock.lockInterruptibly();
try {
// 隊列為空
while (count == 0) {
// nanos <= 0表示最大等待時間已到,那么不用再等待了,返回null。
if (nanos <= 0)
return null;
// 調(diào)用notEmpty.awaitNanos(nanos)方法讓檔期線程等待,并設(shè)置超時時間。
nanos = notEmpty.awaitNanos(nanos);
}
// 調(diào)用dequeue方法,返回列表頭元素
return dequeue();
} finally {
lock.unlock();
}
}
與take()方法流程一樣,只不過調(diào)用awaitNanos(nanos)方法,讓當前線程等待,并設(shè)置等待時間。
五. 查看元素的方法
5.1 element()與peek() 方法
// 調(diào)用AbstractQueue父類中的方法。
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
// 查看隊列頭元素
public E peek() {
final ReentrantLock lock = this.lock;
// 使用lock來保證,多線程修改成員屬性的安全
lock.lock();
try {
// 返回當前隊列頭的元素
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
六. 其他重要方法
6.1 enqueue和dequeue方法
// 將元素x插入隊列尾
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null; //當前putIndex位置元素一定是null
final Object[] items = this.items;
items[putIndex] = x;
// 如果putIndex等于items.length,那么將putIndex重新設(shè)置為0
if (++putIndex == items.length)
putIndex = 0;
// 隊列數(shù)量加一
count++;
// 因為插入一個元素,那么當前隊列肯定不為空,那么喚醒在notEmpty條件下等待的一個線程
notEmpty.signal();
}
// 刪除隊列頭的元素,返回它
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
// 得到當前隊列頭的元素
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 將當前隊列頭位置設(shè)置為null
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
// 隊列數(shù)量減一
count--;
if (itrs != null)
itrs.elementDequeued();
// 因為刪除了一個元素,那么隊列肯定不滿了,那么喚醒在notFull條件下等待的一個線程
notFull.signal();
return x;
}
這兩個方法分別代表,向隊列中插入元素和從隊列中刪除元素。而且它們要喚醒等待的線程。當插入元素后,那么隊列一定不為空,那么就要喚醒在notEmpty條件下等待的線程。當刪除元素后,那么隊列一定不滿,那么就要喚醒在notFull條件下等待的線程。
6.2 remove(Object o)方法
// 刪除隊列中對象o元素,最多刪除一條
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
// 使用lock來保證,多線程修改成員屬性的安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 當隊列中有值的時候,才進行刪除。
if (count > 0) {
// 隊列尾下一個位置
final int putIndex = this.putIndex;
// 隊列頭的位置
int i = takeIndex;
do {
// 當前位置元素與被刪除元素相同
if (o.equals(items[i])) {
// 刪除i位置元素
removeAt(i);
// 返回true
return true;
}
if (++i == items.length)
i = 0;
// 當i==putIndex表示遍歷完所有元素
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
從隊列中刪除指定對象o,那么就要遍歷隊列,刪除第一個與對象o相同的元素,如果隊列中沒有對象o元素,那么返回false刪除失敗。
這里有兩點需要注意:
如何遍歷隊列,就是從隊列頭遍歷到隊列尾。就要靠takeIndex和putIndex兩個變量了。
為什么Object[] items = this.items;這句代碼沒有放到同步鎖lock代碼塊內(nèi)。items是成員變量,那么多線程操作的時候,不會有并發(fā)問題么?
這個是因為items是個引用變量,不是基本數(shù)據(jù)類型,而且我們對隊列的插入和刪除操作,都是針對這一個items數(shù)組,沒有改變數(shù)組的引用,所以在lock代碼中,items會得到其他線程對它最新的修改。但是如果這里將int putIndex = this.putIndex;方法lock代碼塊外面,就會產(chǎn)生問題。
removeAt(final int removeIndex)方法
// 刪除隊列removeIndex位置的元素
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
// 表示刪除元素是列表頭,就容易多了,與dequeue方法流程差不多
if (removeIndex == takeIndex) {
// 移除removeIndex位置元素
items[takeIndex] = null;
// 到了數(shù)組末尾,就要轉(zhuǎn)到數(shù)組頭位置
if (++takeIndex == items.length)
takeIndex = 0;
// 隊列數(shù)量減一
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
// 還沒有到隊列尾,那么就將后一個位置元素覆蓋前一個位置的元素
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
// 將隊列尾元素置位null
items[i] = null;
// 重新設(shè)置putIndex的值
this.putIndex = i;
break;
}
}
// 隊列數(shù)量減一
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 因為刪除了一個元素,那么隊列肯定不滿了,那么喚醒在notFull條件下等待的一個線程
notFull.signal();
}
在隊列中刪除指定位置的元素。需要注意的是刪除之后的數(shù)組還能保持隊列形式,分為兩種情況:
- 如果刪除位置是隊列頭,那么簡單,只需要將隊列頭的位置元素設(shè)置為null,將將隊列頭位置加一。
- 如果刪除位置不是隊列頭,那么麻煩了,這個時候,我們就要將從removeIndex位置后的元素全部左移一位,覆蓋前一個元素。最后將原來隊列尾的元素置位null。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
- java ArrayBlockingQueue阻塞隊列的實現(xiàn)示例
- Java中ArrayBlockingQueue和LinkedBlockingQueue
- Java 并發(fā)編程ArrayBlockingQueue的實現(xiàn)
- java ArrayBlockingQueue的方法及缺點分析
- Java源碼解析阻塞隊列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊列ArrayBlockingQueue功能簡介
- java并發(fā)之ArrayBlockingQueue詳細介紹
- Java并發(fā)編程ArrayBlockingQueue的使用
相關(guān)文章
詳解springboot?springsecuroty中的注銷和權(quán)限控制問題
這篇文章主要介紹了springboot-springsecuroty?注銷和權(quán)限控制,賬戶注銷需要在SecurityConfig中加入開啟注銷功能的代碼,權(quán)限控制要導(dǎo)入springsecurity和thymeleaf的整合依賴,本文通過實例代碼給大家介紹的非常詳細,需要的朋友參考下吧2022-03-03
springboot+redis+阿里云短信實現(xiàn)手機號登錄功能
這篇文章主要介紹了springboot+redis+阿里云短信實現(xiàn)手機號登錄功能,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2024-01-01
詳談springboot過濾器和攔截器的實現(xiàn)及區(qū)別
今天小編就為大家分享一篇詳談springboot過濾器和攔截器的實現(xiàn)及區(qū)別,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-08-08

