淺談Java中的Queue家族
Queue接口
先看下Queue的繼承關(guān)系和其中定義的方法:
Queue繼承自Collection,Collection繼承自Iterable。
Queue有三類主要的方法,我們用個表格來看一下他們的區(qū)別:
方法類型 | 方法名稱 | 方法名稱 | 區(qū)別 |
---|---|---|---|
Insert | add | offer | 兩個方法都表示向Queue中添加某個元素,不同之處在于添加失敗的情況,add只會返回true,如果添加失敗,會拋出異常。offer在添加失敗的時候會返回false。所以對那些有固定長度的Queue,優(yōu)先使用offer方法。 |
Remove | remove | poll | 如果Queue是空的情況下,remove會拋出異常,而poll會返回null。 |
Examine | element | peek | 獲取Queue頭部的元素,但不從Queue中刪除。兩者的區(qū)別還是在于Queue為空的情況下,element會拋出異常,而peek返回null。 |
注意,因為對poll和peek來說null是有特殊含義的,所以一般來說Queue中禁止插入null,但是在實現(xiàn)中還是有一些類允許插入null比如LinkedList。
盡管如此,我們在使用中還是要避免插入null元素。
Queue的分類
一般來說Queue可以分為BlockingQueue,Deque和TransferQueue三種。
BlockingQueue
BlockingQueue是Queue的一種實現(xiàn),它提供了兩種額外的功能:
當當前Queue是空的時候,從BlockingQueue中獲取元素的操作會被阻塞。當當前Queue達到最大容量的時候,插入BlockingQueue的操作會被阻塞。
BlockingQueue的操作可以分為下面四類:
操作類型Throws exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()not applicablenot applicable
第一類是會拋出異常的操作,當遇到插入失敗,隊列為空的時候拋出異常。
第二類是不會拋出異常的操作。
第三類是會Block的操作。當Queue為空或者達到最大容量的時候。
第四類是time out的操作,在給定的時間里會Block,超時會直接返回。
BlockingQueue是線程安全的Queue,可以在生產(chǎn)者消費者模式的多線程中使用,如下所示:
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
最后,在一個線程中向BlockQueue中插入元素之前的操作happens-before另外一個線程中從BlockQueue中刪除或者獲取的操作。
Deque
Deque是Queue的子類,它代表double ended queue,也就是說可以從Queue的頭部或者尾部插入和刪除元素。
同樣的,我們也可以將Deque的方法用下面的表格來表示,Deque的方法可以分為對頭部的操作和對尾部的操作:
方法類型 | Throws exception | Special value | Throws exception | Special value |
---|---|---|---|---|
Insert | addFirst(e) | offerFirst(e) | addLast(e) | offerLast(e) |
Remove | removeFirst() | pollFirst() | removeLast() | pollLast() |
Examine | getFirst() | peekFirst() | getLast() | peekLast() |
和Queue的方法描述基本一致,這里就不多講了。
當Deque以 FIFO (First-In-First-Out)的方法處理元素的時候,Deque就相當于一個Queue。
當Deque以LIFO (Last-In-First-Out)的方式處理元素的時候,Deque就相當于一個Stack。
TransferQueue
TransferQueue繼承自BlockingQueue,為什么叫Transfer呢?因為TransferQueue提供了一個transfer的方法,生產(chǎn)者可以調(diào)用這個transfer方法,從而等待消費者調(diào)用take或者poll方法從Queue中拿取數(shù)據(jù)。
還提供了非阻塞和timeout版本的tryTransfer方法以供使用。
我們舉個TransferQueue實現(xiàn)的生產(chǎn)者消費者的問題。
先定義一個生產(chǎn)者:
@Slf4j @Data @AllArgsConstructor class Producer implements Runnable { private TransferQueue<String> transferQueue; private String name; private Integer messageCount; public static final AtomicInteger messageProduced = new AtomicInteger(); @Override public void run() { for (int i = 0; i < messageCount; i++) { try { boolean added = transferQueue.tryTransfer( "第"+i+"個", 2000, TimeUnit.MILLISECONDS); log.info("transfered {} 是否成功: {}","第"+i+"個",added); if(added){ messageProduced.incrementAndGet(); } } catch (InterruptedException e) { log.error(e.getMessage(),e); } } log.info("total transfered {}",messageProduced.get()); } }
在生產(chǎn)者的run方法中,我們調(diào)用了tryTransfer方法,等待2秒鐘,如果沒成功則直接返回。
再定義一個消費者:
@Slf4j @Data @AllArgsConstructor public class Consumer implements Runnable { private TransferQueue<String> transferQueue; private String name; private int messageCount; public static final AtomicInteger messageConsumed = new AtomicInteger(); @Override public void run() { for (int i = 0; i < messageCount; i++) { try { String element = transferQueue.take(); log.info("take {}",element ); messageConsumed.incrementAndGet(); Thread.sleep(500); } catch (InterruptedException e) { log.error(e.getMessage(),e); } } log.info("total consumed {}",messageConsumed.get()); } }
在run方法中,調(diào)用了transferQueue.take方法來取消息。
下面先看一下一個生產(chǎn)者,零個消費者的情況:
@Test public void testOneProduceZeroConsumer() throws InterruptedException { TransferQueue<String> transferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(10); Producer producer = new Producer(transferQueue, "ProducerOne", 5); exService.execute(producer); exService.awaitTermination(50000, TimeUnit.MILLISECONDS); exService.shutdown(); }
輸出結(jié)果:
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0
可以看到,因為沒有消費者,所以消息并沒有發(fā)送成功。
再看下一個有消費者的情況:
@Test public void testOneProduceOneConsumer() throws InterruptedException { TransferQueue<String> transferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(10); Producer producer = new Producer(transferQueue, "ProducerOne", 2); Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2); exService.execute(producer); exService.execute(consumer); exService.awaitTermination(50000, TimeUnit.MILLISECONDS); exService.shutdown(); }
輸出結(jié)果:
[pool-1-thread-2] INFO com.flydean.Consumer - take 第0個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2
可以看到Producer和Consumer是一個一個來生產(chǎn)和消費的。
以上就是淺談Java中的Queue家族的詳細內(nèi)容,更多關(guān)于Java中的Queue家族的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatisPlus標準數(shù)據(jù)層CRUD的使用詳解
這篇文章主要介紹了MyBatisPlus標準數(shù)據(jù)層CRUD的使用,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Netty分布式編碼器及寫數(shù)據(jù)事件處理使用場景
這篇文章主要為大家介紹了Netty分布式編碼器及寫數(shù)據(jù)事件處理使用場景剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,2022-03-03SpringBoot 整合 dubbo xml實現(xiàn)代碼示例
這篇文章主要介紹了SpringBoot 整合 dubbo xml實現(xiàn)代碼示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-03-03java中靜態(tài)代碼塊與構(gòu)造方法的執(zhí)行順序判斷
對靜態(tài)代碼塊以及構(gòu)造函數(shù)的執(zhí)行先后順序,一直很迷惑,直到最近看到一段代碼,發(fā)現(xiàn)終于弄懂了,所以這篇文章主要給大家介紹了關(guān)于如何判斷java中靜態(tài)代碼塊與構(gòu)造方法的執(zhí)行順序的相關(guān)資料,需要的朋友可以參考下。2017-12-12