java集合框架 arrayblockingqueue應(yīng)用分析
更新時(shí)間:2012年11月28日 10:07:39 作者:
ArrayBlockingQueue是一個(gè)由數(shù)組支持的有界阻塞隊(duì)列。此隊(duì)列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。隊(duì)列的頭部 是在隊(duì)列中存在時(shí)間最長的元素
Queue
------------
1.ArrayDeque, (數(shù)組雙端隊(duì)列)
2.PriorityQueue, (優(yōu)先級隊(duì)列)
3.ConcurrentLinkedQueue, (基于鏈表的并發(fā)隊(duì)列)
4.DelayQueue, (延期阻塞隊(duì)列)(阻塞隊(duì)列實(shí)現(xiàn)了BlockingQueue接口)
5.ArrayBlockingQueue, (基于數(shù)組的并發(fā)阻塞隊(duì)列)
6.LinkedBlockingQueue, (基于鏈表的FIFO阻塞隊(duì)列)
7.LinkedBlockingDeque, (基于鏈表的FIFO雙端阻塞隊(duì)列)
8.PriorityBlockingQueue, (帶優(yōu)先級的無界阻塞隊(duì)列)
9.SynchronousQueue (并發(fā)同步阻塞隊(duì)列)
-----------------------------------------------------
ArrayBlockingQueue
是一個(gè)由數(shù)組支持的有界阻塞隊(duì)列。此隊(duì)列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。隊(duì)列的頭部 是在隊(duì)列中存在時(shí)間最長的元素。隊(duì)列的尾部 是在隊(duì)列中存在時(shí)間最短的元素。新元素插入到隊(duì)列的尾部,隊(duì)列獲取操作則是從隊(duì)列頭部開始獲得元素。
這是一個(gè)典型的“有界緩存區(qū)”,固定大小的數(shù)組在其中保持生產(chǎn)者插入的元素和使用者提取的元素。一旦創(chuàng)建了這樣的緩存區(qū),就不能再增加其容量。試圖向已滿隊(duì)列中放入元素會導(dǎo)致操作受阻塞;試圖從空隊(duì)列中提取元素將導(dǎo)致類似阻塞。
此類支持對等待的生產(chǎn)者線程和消費(fèi)者線程進(jìn)行排序的可選公平策略。默認(rèn)情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設(shè)置為 true 而構(gòu)造的隊(duì)列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
/** 隊(duì)列元素 數(shù)組 */
private final E[] items;
/** 獲取、刪除元素時(shí)的索引(take, poll 或 remove操作) */
private int takeIndex;
/** 添加元素時(shí)的索引(put, offer或 add操作) */
private int putIndex;
/** 隊(duì)列元素的數(shù)目*/
private int count;
/** 鎖 */
private final ReentrantLock lock;
/** 獲取操作時(shí)的條件 */
private final Condition notEmpty;
/** 插入操作時(shí)的條件 */
private final Condition notFull;
//超出數(shù)組長度時(shí),重設(shè)為0
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
/**
* 插入元素(在獲得鎖的情況下才調(diào)用)
*/
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
/**
* 獲取并移除元素(在獲得鎖的情況下才調(diào)用)
*/
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);//移到下一個(gè)位置
--count;
notFull.signal();
return x;
}
/**
* 刪除i位置的元素
*/
void removeAt(int i) {
final E[] items = this.items;
// if removing front item, just advance
if (i == takeIndex) {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
} else {
// 把i后面的直到putIndex的元素都向前移動一個(gè)位置
for (;;) {
int nexti = inc(i);
if (nexti != putIndex) {
items[i] = items[nexti];
i = nexti;
} else {
items[i] = null;
putIndex = i;
break;
}
}
}
--count;
notFull.signal();
}
/**
*構(gòu)造方法,指定容量,默認(rèn)策略(不是按照FIFO的順序訪問)
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
*構(gòu)造方法,指定容量及策略
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 通過集合構(gòu)造
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
if (capacity < c.size())
throw new IllegalArgumentException();
for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
add(it.next());
}
/**
* 插入元素到隊(duì)尾(super調(diào)用offer方法)
* public boolean add(E e) {
* if (offer(e))
* return true;
* else
* throw new IllegalStateException("Queue full");
* }
* 將指定的元素插入到此隊(duì)列的尾部(如果立即可行且不會超過該隊(duì)列的容量),
* 在成功時(shí)返回 true,如果此隊(duì)列已滿,則拋出 IllegalStateException。
*/
public boolean add(E e) {
return super.add(e);
}
/**
* 將指定的元素插入到此隊(duì)列的尾部(如果立即可行且不會超過該隊(duì)列的容量),
* 在成功時(shí)返回 true,如果此隊(duì)列已滿,則返回 false。
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
/**
* 將指定的元素插入此隊(duì)列的尾部,如果該隊(duì)列已滿,則等待可用的空間。
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
/**
* 將指定的元素插入此隊(duì)列的尾部,如果該隊(duì)列已滿,則在到達(dá)指定的等待時(shí)間之前等待可用的空間。
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != items.length) {
insert(e);
return true;
}
if (nanos <= 0)//如果時(shí)間到了就返回
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
//獲取并移除此隊(duì)列的頭,如果此隊(duì)列為空,則返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;
E x = extract();
return x;
} finally {
lock.unlock();
}
}
//獲取并移除此隊(duì)列的頭部,在元素變得可用之前一直等待(如果有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
//獲取并移除此隊(duì)列的頭部,在指定的等待時(shí)間前等待可用的元素(如果有必要)。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != 0) {
E x = extract();
return x;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
//獲取但不移除此隊(duì)列的頭;如果此隊(duì)列為空,則返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : items[takeIndex];
} finally {
lock.unlock();
}
}
/**
* 返回此隊(duì)列中元素的數(shù)量。
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
/**
*返回在無阻塞的理想情況下(不存在內(nèi)存或資源約束)此隊(duì)列能接受的其他元素?cái)?shù)量。
*/
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
/**
* 從此隊(duì)列中移除指定元素的單個(gè)實(shí)例(如果存在)。
*/
public boolean remove(Object o) {
if (o == null) return false;
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int k = 0;
for (;;) {
if (k++ >= count)
return false;
if (o.equals(items[i])) {
removeAt(i);
return true;
}
i = inc(i);
}
} finally {
lock.unlock();
}
}
/**
* 如果此隊(duì)列包含指定的元素,則返回 true。
*/
public boolean contains(Object o) {
if (o == null) return false;
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int k = 0;
while (k++ < count) {
if (o.equals(items[i]))
return true;
i = inc(i);
}
return false;
} finally {
lock.unlock();
}
}
……
}
------------
1.ArrayDeque, (數(shù)組雙端隊(duì)列)
2.PriorityQueue, (優(yōu)先級隊(duì)列)
3.ConcurrentLinkedQueue, (基于鏈表的并發(fā)隊(duì)列)
4.DelayQueue, (延期阻塞隊(duì)列)(阻塞隊(duì)列實(shí)現(xiàn)了BlockingQueue接口)
5.ArrayBlockingQueue, (基于數(shù)組的并發(fā)阻塞隊(duì)列)
6.LinkedBlockingQueue, (基于鏈表的FIFO阻塞隊(duì)列)
7.LinkedBlockingDeque, (基于鏈表的FIFO雙端阻塞隊(duì)列)
8.PriorityBlockingQueue, (帶優(yōu)先級的無界阻塞隊(duì)列)
9.SynchronousQueue (并發(fā)同步阻塞隊(duì)列)
-----------------------------------------------------
ArrayBlockingQueue
是一個(gè)由數(shù)組支持的有界阻塞隊(duì)列。此隊(duì)列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。隊(duì)列的頭部 是在隊(duì)列中存在時(shí)間最長的元素。隊(duì)列的尾部 是在隊(duì)列中存在時(shí)間最短的元素。新元素插入到隊(duì)列的尾部,隊(duì)列獲取操作則是從隊(duì)列頭部開始獲得元素。
這是一個(gè)典型的“有界緩存區(qū)”,固定大小的數(shù)組在其中保持生產(chǎn)者插入的元素和使用者提取的元素。一旦創(chuàng)建了這樣的緩存區(qū),就不能再增加其容量。試圖向已滿隊(duì)列中放入元素會導(dǎo)致操作受阻塞;試圖從空隊(duì)列中提取元素將導(dǎo)致類似阻塞。
此類支持對等待的生產(chǎn)者線程和消費(fèi)者線程進(jìn)行排序的可選公平策略。默認(rèn)情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設(shè)置為 true 而構(gòu)造的隊(duì)列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。
復(fù)制代碼 代碼如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
/** 隊(duì)列元素 數(shù)組 */
private final E[] items;
/** 獲取、刪除元素時(shí)的索引(take, poll 或 remove操作) */
private int takeIndex;
/** 添加元素時(shí)的索引(put, offer或 add操作) */
private int putIndex;
/** 隊(duì)列元素的數(shù)目*/
private int count;
/** 鎖 */
private final ReentrantLock lock;
/** 獲取操作時(shí)的條件 */
private final Condition notEmpty;
/** 插入操作時(shí)的條件 */
private final Condition notFull;
//超出數(shù)組長度時(shí),重設(shè)為0
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
/**
* 插入元素(在獲得鎖的情況下才調(diào)用)
*/
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
/**
* 獲取并移除元素(在獲得鎖的情況下才調(diào)用)
*/
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);//移到下一個(gè)位置
--count;
notFull.signal();
return x;
}
/**
* 刪除i位置的元素
*/
void removeAt(int i) {
final E[] items = this.items;
// if removing front item, just advance
if (i == takeIndex) {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
} else {
// 把i后面的直到putIndex的元素都向前移動一個(gè)位置
for (;;) {
int nexti = inc(i);
if (nexti != putIndex) {
items[i] = items[nexti];
i = nexti;
} else {
items[i] = null;
putIndex = i;
break;
}
}
}
--count;
notFull.signal();
}
/**
*構(gòu)造方法,指定容量,默認(rèn)策略(不是按照FIFO的順序訪問)
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
*構(gòu)造方法,指定容量及策略
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 通過集合構(gòu)造
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
if (capacity < c.size())
throw new IllegalArgumentException();
for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
add(it.next());
}
/**
* 插入元素到隊(duì)尾(super調(diào)用offer方法)
* public boolean add(E e) {
* if (offer(e))
* return true;
* else
* throw new IllegalStateException("Queue full");
* }
* 將指定的元素插入到此隊(duì)列的尾部(如果立即可行且不會超過該隊(duì)列的容量),
* 在成功時(shí)返回 true,如果此隊(duì)列已滿,則拋出 IllegalStateException。
*/
public boolean add(E e) {
return super.add(e);
}
/**
* 將指定的元素插入到此隊(duì)列的尾部(如果立即可行且不會超過該隊(duì)列的容量),
* 在成功時(shí)返回 true,如果此隊(duì)列已滿,則返回 false。
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
/**
* 將指定的元素插入此隊(duì)列的尾部,如果該隊(duì)列已滿,則等待可用的空間。
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
/**
* 將指定的元素插入此隊(duì)列的尾部,如果該隊(duì)列已滿,則在到達(dá)指定的等待時(shí)間之前等待可用的空間。
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != items.length) {
insert(e);
return true;
}
if (nanos <= 0)//如果時(shí)間到了就返回
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
//獲取并移除此隊(duì)列的頭,如果此隊(duì)列為空,則返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;
E x = extract();
return x;
} finally {
lock.unlock();
}
}
//獲取并移除此隊(duì)列的頭部,在元素變得可用之前一直等待(如果有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
//獲取并移除此隊(duì)列的頭部,在指定的等待時(shí)間前等待可用的元素(如果有必要)。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != 0) {
E x = extract();
return x;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
//獲取但不移除此隊(duì)列的頭;如果此隊(duì)列為空,則返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : items[takeIndex];
} finally {
lock.unlock();
}
}
/**
* 返回此隊(duì)列中元素的數(shù)量。
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
/**
*返回在無阻塞的理想情況下(不存在內(nèi)存或資源約束)此隊(duì)列能接受的其他元素?cái)?shù)量。
*/
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
/**
* 從此隊(duì)列中移除指定元素的單個(gè)實(shí)例(如果存在)。
*/
public boolean remove(Object o) {
if (o == null) return false;
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int k = 0;
for (;;) {
if (k++ >= count)
return false;
if (o.equals(items[i])) {
removeAt(i);
return true;
}
i = inc(i);
}
} finally {
lock.unlock();
}
}
/**
* 如果此隊(duì)列包含指定的元素,則返回 true。
*/
public boolean contains(Object o) {
if (o == null) return false;
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int k = 0;
while (k++ < count) {
if (o.equals(items[i]))
return true;
i = inc(i);
}
return false;
} finally {
lock.unlock();
}
}
……
}
您可能感興趣的文章:
- java中LinkedBlockingQueue與ArrayBlockingQueue的異同
- Java 并發(fā)編程ArrayBlockingQueue的實(shí)現(xiàn)
- 詳細(xì)分析Java并發(fā)集合ArrayBlockingQueue的用法
- 深入理解Java并發(fā)編程之LinkedBlockingQueue隊(duì)列
- java并發(fā)編程工具類JUC之LinkedBlockingQueue鏈表隊(duì)列
- 詳細(xì)分析Java并發(fā)集合LinkedBlockingQueue的用法
- Java中ArrayBlockingQueue和LinkedBlockingQueue
相關(guān)文章
java線性表的存儲結(jié)構(gòu)及其代碼實(shí)現(xiàn)
這篇文章主要為大家詳細(xì)介紹了Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)筆記第一篇,線性表的存儲結(jié)構(gòu)及其代碼實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-09-09淺談關(guān)于Java正則和轉(zhuǎn)義中\(zhòng)\和\\\\的理解
這篇文章主要介紹了淺談關(guān)于Java正則和轉(zhuǎn)義中\(zhòng)\和\\\\的理解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08java arrayList遍歷的四種方法及Java中ArrayList類的用法
arraylist是動態(tài)數(shù)組,它具有三個(gè)好處分別是:動態(tài)的增加和減少元素 、實(shí)現(xiàn)了ICollection和IList接口、靈活的設(shè)置數(shù)組的大小,本文給大家介紹java arraylist遍歷及Java arraylist 用法,感興趣的朋友一起學(xué)習(xí)吧2015-11-11SpringCloud服務(wù)注冊和發(fā)現(xiàn)組件Eureka
對于微服務(wù)的治理而言,其核心就是服務(wù)的注冊和發(fā)現(xiàn)。在SpringCloud 中提供了多種服務(wù)注冊與發(fā)現(xiàn)組件,官方推薦使用Eureka。本篇文章,我們來講解springcloud的服務(wù)注冊和發(fā)現(xiàn)組件,感興趣的可以了解一下2021-05-05淺談java多態(tài)的實(shí)現(xiàn)主要體現(xiàn)在哪些方面
下面小編就為大家?guī)硪黄獪\談java多態(tài)的實(shí)現(xiàn)主要體現(xiàn)在哪些方面。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-09-09SpringBoot實(shí)現(xiàn)簡單的日志鏈路追蹤
隨著分布式應(yīng)用的普及,現(xiàn)在的一些應(yīng)用系統(tǒng)不再像以前,所有的文件(前后端程序)都打包在一個(gè)包中,本文通過一個(gè)簡單的SpringBoot應(yīng)用來總結(jié),我們?nèi)绾螌⑷罩敬?lián)起來,文中有詳細(xì)的代碼示例,需要的朋友可以參考下2023-10-10