java中并發(fā)Queue種類與各自API特點以及使用場景說明
一 先說下隊列
隊列是一種數(shù)據(jù)結(jié)構(gòu).它有兩個基本操作:在隊列尾部加入一個元素,和從隊列頭部移除一個元素(注意不要弄混隊列的頭部和尾部)
就是說,隊列以一種先進(jìn)先出的方式管理數(shù)據(jù),如果你試圖向一個 已經(jīng)滿了的阻塞隊列中添加一個元素或者是從一個空的阻塞隊列中移除一個元索,將導(dǎo)致線程阻塞.
在多線程進(jìn)行合作時,阻塞隊列是很有用的工具。工作者線程可以定期地把中間結(jié)果存到阻塞隊列中而其他工作者線程把中間結(jié)果取出并在將來修改它們。隊列會自動平衡負(fù)載。
如果第一個線程集運行得比第二個慢,則第二個 線程集在等待結(jié)果時就會阻塞。如果第一個線程集運行得快,那么它將等待第二個線程集趕上來.
說白了,就是先進(jìn)先出,線程安全!
java中并發(fā)隊列都是在java.util.concurrent并發(fā)包下的,Queue接口與List、Set同一級別,都是繼承了Collection接口,最近學(xué)習(xí)了java中的并發(fā)Queue的所有子類應(yīng)用場景,這里記錄分享一下:
1.1 這里可以先用wait與notify(腦忒fai) 模擬一下隊列的增刪數(shù)據(jù),簡單了解一下隊列:
import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 模擬隊列增刪數(shù)據(jù) * @author houzheng */ public class MyQueue { //元素集合 private LinkedList<Object> list=new LinkedList<Object>(); //計數(shù)器(同步),判斷集合元素數(shù)量 private AtomicInteger count=new AtomicInteger(); //集合上限與下限,final必須指定初值 private final int minSize=0; private final int maxSize; //構(gòu)造器指定最大值 public MyQueue(int maxSize) { this.maxSize = maxSize; } //初始化對象,用于加鎖,也可直接用this private Object lock=new Object(); //put方法:往集合中添加元素,如果集合元素已滿,則此線程阻塞,直到有空間再繼續(xù) public void put(Object obj){ synchronized (lock) { while(count.get()==this.maxSize){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace();} } list.add(obj); //計數(shù)器加一 count.incrementAndGet(); System.out.println("放入元素:"+obj); //喚醒另一個線程,(處理極端情況:集合一開始就是空,此時take線程會一直等待) lock.notify(); } } //take方法:從元素中取數(shù)據(jù),如果集合為空,則線程阻塞,直到集合不為空再繼續(xù) public Object take(){ Object result=null; synchronized(lock){ while(count.get()==this.minSize){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace();} } //移除第一個 result=list.removeFirst(); //計數(shù)器減一 count.decrementAndGet(); System.out.println("拿走元素:"+result); //喚醒另一個線程,(處理極端情況:集合一開始就是滿的,此時put線程會一直等待) lock.notify(); } return result; } public int getSize(){ return this.count.get(); } public static void main(String[] args) { //創(chuàng)建集合容器 MyQueue queue=new MyQueue(5); queue.put("1"); queue.put("2"); queue.put("3"); queue.put("4"); queue.put("5"); System.out.println("當(dāng)前容器長度為:"+queue.getSize()); Thread t1=new Thread(()->{ queue.put("6"); queue.put("7"); },"t1"); Thread t2=new Thread(()->{ Object take1 = queue.take(); Object take2 = queue.take(); },"t2"); //測試極端情況,兩秒鐘后再執(zhí)行另一個線程 t1.start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } t2.start(); } }
這里用線程通信的方式簡單模擬了隊列的進(jìn)出,那么接下來就正式進(jìn)入java的并發(fā)隊列:
二 并發(fā)Queue
JDK中并發(fā)隊列提供了兩種實現(xiàn),一種是高性能隊列ConcurrentLinkedQueue,一種是阻塞隊列BlockingQueue,兩種都繼承自Queue:
1ConcurrentLinkedQueue
這是一個使用于高并發(fā)場景的隊列(額,各位看這塊博客的小朋友,最好對線程基礎(chǔ)比較熟悉再來看,當(dāng)然我也在拼命學(xué)習(xí)啦,哈哈哈),主要是無鎖的方式,他的想能要比BlockingQueue好
是基于鏈接節(jié)點的無界線程安全隊列,先進(jìn)先出,不允許有null元素,廢話不多說,上demo:
這種queue比較簡單,沒什么好說的,和ArrayList一樣用就可以,關(guān)鍵是BlockingQUeue
2BlockingQueue
blockingQueue主要有5中實現(xiàn),我感覺都挺有意思的,其中幾種還比較常用就都學(xué)習(xí)了下,這里都介紹下:
2.1ArrayBlockingQueue
@Test public void test02() throws Exception{ //必須指定隊列長度 ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(2); abq.add("a"); //add :添加元素,如果BlockingQueue可以容納,則返回true,否則拋異常,支持添加集合 System.out.println(abq.offer("b"));//容量如果不夠,返回false //offer: 如果可能的話,添加元素,即如果BlockingQueue可以容納,則返回true,否則返回false,支持設(shè)置超時時間 //設(shè)置超時,如果超過時間就不添加,返回false, abq.offer("d", 2, TimeUnit.SECONDS);// 添加的元素,時長,單位 //put 添加元素,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù). abq.put("d");//會一直等待 //poll 取走頭部元素,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回null,支持設(shè)置超時時間 abq.poll(); abq.poll(2,TimeUnit.SECONDS);//兩秒取不到返回null //take() 取走頭部元素,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到Blocking有新的對象被加入為止 abq.take(); //取出頭部元素,但不刪除 abq.element(); //drainTo() //一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個數(shù)),通過該方法,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖。 List list=new ArrayList(); abq.drainTo(list,2);//將隊列中兩個元素取到list中,取走后隊列中就沒有取走的元素 System.out.println(list); //[a,b] System.out.println(abq); //[] }
2.2 LinkedBlockingQueue
@Test public void test03(){ LinkedBlockingQueue lbq=new LinkedBlockingQueue();//可指定容量,也可不指定 lbq.add("a"); lbq.add("b"); lbq.add("c"); //API與ArrayBlockingQueue相同 //是否包含 System.out.println(lbq.contains("a")); //移除頭部元素或者指定元素 remove("a") System.out.println(lbq.remove()); //轉(zhuǎn)數(shù)組 Object[] array = lbq.toArray(); //element 取出頭部元素,但不刪除 System.out.println(lbq.element()); System.out.println(lbq.element()); System.out.println(lbq.element()); }
2.3 SynchronousQueue
public static void main(String[] args) { SynchronousQueue<String> sq=new SynchronousQueue<String>(); // iterator() 永遠(yuǎn)返回空,因為里面沒東西。 // peek() 永遠(yuǎn)返回null /** * isEmpty()永遠(yuǎn)是true。 * remainingCapacity() 永遠(yuǎn)是0。 * remove()和removeAll() 永遠(yuǎn)是false。 */ new Thread(()->{ try { //取出并且remove掉queue里的element(認(rèn)為是在queue里的。。。),取不到東西他會一直等。 System.out.println(sq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { //offer() 往queue里放一個element后立即返回, //如果碰巧這個element被另一個thread取走了,offer方法返回true,認(rèn)為offer成功;否則返回false //true ,上面take線程一直在等, ////下面剛offer進(jìn)去就被拿走了,返回true,如果offer線程先執(zhí)行,則返回false System.out.println(sq.offer("b")); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { //往queue放進(jìn)去一個element以后就一直wait直到有其他thread進(jìn)來把這個element取走 sq.put("a"); } catch (Exception e) { e.printStackTrace(); } }).start(); }
2.4 PriorityBlockingQueue
@Test public void test04() throws Exception{ //隊列里元素必須實現(xiàn)Comparable接口,用來決定優(yōu)先級 PriorityBlockingQueue<String> pbq=new PriorityBlockingQueue<String>(); pbq.add("b"); pbq.add("g"); pbq.add("a"); pbq.add("c"); //獲取的時候會根據(jù)優(yōu)先級取元素,插入的時候不會排序,節(jié)省性能 //System.out.println(pbq.take());//a,獲取時會排序,按優(yōu)先級獲取 System.out.println(pbq.toString());//如果前面沒有取值,直接syso也不會排序 Iterator<String> iterator = pbq.iterator(); while(iterator.hasNext()){ System.out.println(iterator.next()); } } @Test public void test05(){ PriorityBlockingQueue<Person> pbq=new PriorityBlockingQueue<Person>(); Person p2=new Person("姚振",20); Person p1=new Person("侯征",24); Person p3=new Person("何毅",18); Person p4=new Person("李世彪",22); pbq.add(p1); pbq.add(p2); pbq.add(p3); pbq.add(p4); System.out.println(pbq);//沒有按優(yōu)先級排序 try { //只要take獲取元素就會按照優(yōu)先級排序,獲取一次就全部排好序了,后面就會按優(yōu)先級迭代 pbq.take(); } catch (InterruptedException e) { e.printStackTrace(); } //按年齡排好了序 for (Iterator iterator = pbq.iterator(); iterator.hasNext();) { Person person = (Person) iterator.next(); System.out.println(person); } }
2.5 最后說一下DelayQueue ,這里用個網(wǎng)上很經(jīng)典的例子,網(wǎng)吧上網(wǎng)計時
網(wǎng)民實體queue中元素
//網(wǎng)民 public class Netizen implements Delayed { //身份證 private String ID; //名字 private String name; //上網(wǎng)截止時間 private long playTime; //比較優(yōu)先級,時間最短的優(yōu)先 @Override public int compareTo(Delayed o) { Netizen netizen=(Netizen) o; return this.getDelay(TimeUnit.SECONDS)-o.getDelay(TimeUnit.SECONDS)>0?1:0; } public Netizen(String iD, String name, long playTime) { ID = iD; this.name = name; this.playTime = playTime; } //獲取上網(wǎng)時長,即延時時長 @Override public long getDelay(TimeUnit unit) { //上網(wǎng)截止時間減去現(xiàn)在當(dāng)前時間=時長 return this.playTime-System.currentTimeMillis(); }
網(wǎng)吧類:
//網(wǎng)吧 public class InternetBar implements Runnable { //網(wǎng)民隊列,使用延時隊列 private DelayQueue<Netizen> dq=new DelayQueue<Netizen>(); //上網(wǎng) public void startPlay(String id,String name,Integer money){ //截止時間= 錢數(shù)*時間+當(dāng)前時間(1塊錢1秒) Netizen netizen=new Netizen(id,name,1000*money+System.currentTimeMillis()); System.out.println(name+"開始上網(wǎng)計費......"); dq.add(netizen); } //時間到下機(jī) public void endTime(Netizen netizen){ System.out.println(netizen.getName()+"余額用完,下機(jī)"); } @Override public void run() { //線程,監(jiān)控每個網(wǎng)民上網(wǎng)時長 while(true){ try { //除非時間到.否則會一直等待,直到取出這個元素為止 Netizen netizen=dq.take(); endTime(netizen); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { //新建一個網(wǎng)吧 InternetBar internetBar=new InternetBar(); //來了三個網(wǎng)民上網(wǎng) internetBar.startPlay("001","侯征",3); internetBar.startPlay("002","姚振",7); internetBar.startPlay("003","何毅",5); Thread t1=new Thread(internetBar); t1.start(); } }
這樣就可以完美實現(xiàn)業(yè)務(wù)需求了
結(jié)果
,
這塊東西比較深,還需要不斷加強(qiáng)學(xué)習(xí)實踐才行!!
Java中的Queue和自定義堆棧
Queue:單向
- 隊列通常 FIFO (先進(jìn)先出)
- 優(yōu)先級隊列和堆棧 LIFO (后進(jìn)先出)
package com.bjsxt.others.que; import java.util.ArrayDeque; import java.util.Queue; /** * 使用隊列模擬銀行存款業(yè)務(wù) * @author Administrator * */ public class Demo01 { /** * @param args */ public static void main(String[] args) { Queue<Request> que =new ArrayDeque<Request>(); //模擬排隊情況 for(int i=0;i<10;i++){ final int num =i; que.offer(new Request(){ //應(yīng)用匿名內(nèi)部類對象只能訪問 final 修飾的變量 @Override public void deposit() { System.out.println("第"+num+"個人,辦理存款業(yè)務(wù),存款額度為:"+(Math.random()*10000)); } }); } dealWith(que); } //處理業(yè)務(wù) public static void dealWith(Queue<Request> que){ Request req =null; while(null!=(req=que.poll())){ req.deposit(); } } } interface Request{ //存款 void deposit(); }
自定義堆棧
package com.bjsxt.others.que; import java.util.ArrayDeque; import java.util.Deque; /** * 使用隊列實現(xiàn)自定義堆棧 * 1、彈 * 2、壓 * 3、獲取頭 * @author Administrator * * @param <E> */ public class MyStack<E> { //容器 private Deque<E> container =new ArrayDeque<E>(); //容量 private int cap; public MyStack(int cap) { super(); this.cap = cap; } //壓棧 public boolean push(E e){ if(container.size()+1>cap){ return false; } return container.offerLast(e); } //彈棧 public E pop(){ return container.pollLast(); } //獲取 public E peek(){ return container.peekLast(); } public int size(){ return this.container.size(); } } package com.bjsxt.others.que; //測試自定義堆棧 public class Demo02 { /** * @param args */ public static void main(String[] args) { MyStack<String> backHistory =new MyStack<String>(3); backHistory.push("www.baidu.com"); backHistory.push("www.google.com"); backHistory.push("www.sina.com"); backHistory.push("www.bjsxt.cn"); System.out.println("大?。?+backHistory.size()); //遍歷 String item=null; while(null!=(item=backHistory.pop())){ System.out.println(item); } } }
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
淺談導(dǎo)入JavaWeb 項目出現(xiàn)的問題
這篇文章主要介紹了導(dǎo)入JavaWeb 項目出現(xiàn)的問題,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03Java postgresql數(shù)組字段類型處理方法詳解
這篇文章主要介紹了Java postgresql數(shù)組字段類型處理方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10解決ResourceBundle.getBundle文件路徑問題
這篇文章主要介紹了解決ResourceBundle.getBundle文件路徑問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01JavaScript實現(xiàn)鼠標(biāo)移動粒子跟隨效果
這篇文章主要為大家詳細(xì)介紹了JavaScript實現(xiàn)鼠標(biāo)移動粒子跟隨效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-08-08Java如何獲取數(shù)組和字符串的長度(length還是length())
這篇文章主要介紹了Java如何獲取數(shù)組和字符串的長度(length還是length()),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12HttpServletResponse亂碼問題_動力節(jié)點Java學(xué)院整理
這篇文章主要介紹了HttpServletResponse亂碼問題,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07徹底搞懂java并發(fā)ThreadPoolExecutor使用
這篇文章主要為大家介紹了徹底搞懂java并發(fā)ThreadPoolExecutor使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02Java實現(xiàn)數(shù)據(jù)庫連接池的方法
這篇文章主要介紹了Java實現(xiàn)數(shù)據(jù)庫連接池的方法,涉及java數(shù)據(jù)庫連接池的創(chuàng)建、連接、刷新、關(guān)閉及狀態(tài)獲取的常用技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-07-07