java并發(fā)之ArrayBlockingQueue詳細介紹
java并發(fā)之ArrayBlockingQueue詳細介紹
ArrayBlockingQueue是常用的線程集合,在線程池中也常常被當做任務隊列來使用。使用頻率特別高。他是維護的是一個循環(huán)隊列(基于數(shù)組實現(xiàn)),循環(huán)結構在數(shù)據(jù)結構中比較常見,但是在源碼實現(xiàn)中還是比較少見的。
線程安全的實現(xiàn)
線程安全隊列,基本是離不開鎖的。ArrayBlockingQueue使用的是ReentrantLock,配合兩種Condition,實現(xiàn)了集合的線程安全操作。這里稍微說一個好習慣,下面是成員變量的聲明。
private static final long serialVersionUID = -817911632652898426L; final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; transient Itrs itrs = null;
賦值的操作基本都是在構造函數(shù)里做的。這樣有個好處,代碼執(zhí)行可控。成員變量的初始化也是會合并在構造方法里執(zhí)行的,但是在執(zhí)行順序上需要好好斟酌,如果寫在構造方法里初始化,則沒有相關問題。
阻塞隊列的常用場所就是生產(chǎn)者消費者。一般都是生產(chǎn)者放入,消費者從頭取數(shù)據(jù)。下面重點說這兩個操作。
這兩個操作都是依靠鎖來保證線程安全的。
生產(chǎn)操作
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
put等放入操作,首先是獲取鎖,如果發(fā)現(xiàn)數(shù)據(jù)滿了,就通過notFull的condition,來阻塞線程。這里的條件判定一定是用while而不是if,多線程情況下,可以被喚醒后發(fā)現(xiàn)又滿了。
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
這個是入隊列的操作。首先獲取維護的數(shù)組。putindex就是放入操作的標志。這個操作會一直加。達到預定的長度后就變成0從頭開始計數(shù)。這樣插入的操作就是一個循環(huán)的操作了,count就是用來做計數(shù)的,作為能否插入數(shù)據(jù)的一個標準,插入數(shù)據(jù)后就通過notEmpty的condition發(fā)出一個信號喚醒消費線程。
消費操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
消費的方法也是這樣。先獲取鎖,然后進行條件判斷,如果沒有數(shù)據(jù),則阻塞線程。注意點和put一樣。
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
取數(shù)據(jù)的時候,也依靠takeIndex,這是一個標志,這個數(shù)值也會一直增加,表示取的第一個數(shù)據(jù)的位置。如果這個標志走到最后,然后變成0,從頭再來。這樣保證取出的數(shù)據(jù)都是fifo的順序。刪除的時候如果發(fā)現(xiàn)迭代中,則會修改迭代器的遍歷。然后通過notFull的condition來喚醒生產(chǎn)線程。
移除操作
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
對于remove操作就比較麻煩了,首先獲取鎖之后,把兩個標志位本地化,然后找到要刪除的元素的位置。調用removeAt,這里刪除需要對標志位做改變。
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
如果刪除的元素是位置和takeindex一樣。那就可以直接刪除,然后讓刪除標志位向后移動。如果不是,則從刪除的位置開始,進行后面向前面的數(shù)據(jù)覆蓋的操作。直到遇到putindex的前一個位置。然后把那個位置的數(shù)據(jù)設置為null。并且把putindex的位置往前移動一格,正在迭代的時候要刪除數(shù)據(jù)并且喚醒生產(chǎn)線程。
感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
- java ArrayBlockingQueue阻塞隊列的實現(xiàn)示例
- Java中ArrayBlockingQueue和LinkedBlockingQueue
- Java 并發(fā)編程ArrayBlockingQueue的實現(xiàn)
- java ArrayBlockingQueue的方法及缺點分析
- Java源碼解析阻塞隊列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊列ArrayBlockingQueue功能簡介
- 詳細分析Java并發(fā)集合ArrayBlockingQueue的用法
- Java并發(fā)編程ArrayBlockingQueue的使用
相關文章
Java OpenCV4.0.0實現(xiàn)實時人臉識別
這篇文章主要為大家詳細介紹了Java OpenCV4.0.0實現(xiàn)實時人臉識別,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-07-07

