手把手帶你理解java線程池之工作隊(duì)列workQueue
線程池之工作隊(duì)列
ArrayBlockingQueue
采用數(shù)組來實(shí)現(xiàn),并采用可重入鎖ReentrantLock來做并發(fā)控制,無論是添加還是讀取,都先要獲得鎖才能進(jìn)行操作 可看出進(jìn)行讀寫操作都使用了ReentrantLock,ArrayBlockingQueue需要為其指定容量
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
SynchronousQueue
由于SynchronousQueue源碼比較復(fù)雜,里面大量的Cas操作,SynchronousQueue沒有容器,所以里面是裝不了任務(wù)的,當(dāng)一個生產(chǎn)者線程生產(chǎn)一個任務(wù)的 時候,如果沒有對應(yīng)的消費(fèi)者消費(fèi),那么該生產(chǎn)者會一直阻塞,知道有消費(fèi)者消費(fèi)為止。
圖示:
如下代碼,如果我們將消費(fèi)者線程注釋掉執(zhí)行,那么生產(chǎn)者哪里將會一直阻塞
package thread.customthreadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; /** * 測試SynchronousQueue */ public class SynchronousQueueTest { private static final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); private static final ExecutorService service = Executors.newCachedThreadPool(); public static void main(String[] args) { /** * Provider */ service.submit(() -> { try { synchronousQueue.put("liu"); }catch (Exception e){ e.printStackTrace(); } System.out.println("Consumer finished spending"); }); /** * Consumer */ service.submit(() ->{ try { synchronousQueue.take(); }catch (Exception e){ e.printStackTrace(); } System.out.println("take over"); }); } }
LinkedBlockingDeque
LinkedBlockingDeque是一個雙向隊(duì)列,底層使用單鏈表實(shí)現(xiàn),任何一段都可進(jìn)行元素的讀寫操作,在初始化LinkedBlockingDeque的時候, 我們可以指定容量,也可不指定,如果不指定,則容量為Integer.MAX_VALUE,
注:Deque是雙端隊(duì)列,而Queue是單端隊(duì)列,雙端意思是兩端都可以進(jìn)行讀寫操作,而單端則只能從一端進(jìn),一端出(FIFO)
public LinkedBlockingDeque() { this(Integer.MAX_VALUE); }
package thread.customthreadpool; import java.util.concurrent.LinkedBlockingDeque; public class LinkedBlockingDequeTest { private static final LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>(); public static void main(String[] args) throws InterruptedException { deque.put(1); deque.put(2); deque.put(3); deque.put(4); deque.put(5); System.out.println(deque); System.out.println("deque size "+deque.size()); deque.take(); deque.take(); deque.take(); deque.take(); deque.take(); System.out.println(deque); System.out.println("deque size "+deque.size()); } }
LinkedBlockingQueue
底層基于單向連表實(shí)現(xiàn),是一個單向隊(duì)列,具有先進(jìn)先出(FIFO)特點(diǎn),使用了ReentrantLock來做并發(fā)控制,讀寫操作都上鎖
private final ReentrantLock putLock = new ReentrantLock(); public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
DelayDeque
DelayDeque是一個無界隊(duì)列,添加進(jìn)DelayDeque的元素會經(jīng)過compareTo方法計算,然后按照時間 進(jìn)行排序,排在隊(duì)頭的元素是最早到期的,越往后到期時間越長,DelayDeque只能接受Delayed接口類型 如圖所示,隊(duì)列里的元素并不是按照先進(jìn)先出的規(guī)則,而是按照過期時間
示例
package thread.customthreadpool.delayDeque; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class MyDelayed implements Delayed { private final String taskName ; private final long nowTime = System.currentTimeMillis(); private final long expireTime ; public MyDelayed(String taskName,long expireTime) { this.taskName = taskName; this.expireTime = expireTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert((nowTime+expireTime) - System.currentTimeMillis(),TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { MyDelayed myDelayed = (MyDelayed) o; return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return "MyDelayed{" + "taskName='" + taskName + '\'' + ", nowTime=" + nowTime + ", expireTime=" + expireTime + '}'; } }
package thread.customthreadpool.delayDeque; import java.util.concurrent.*; public class MyDelayQueue { private static final DelayQueue<MyDelayed> delayQueue = new DelayQueue<>(); private static final ExecutorService service = Executors.newCachedThreadPool(); public static void main(String[] args) throws InterruptedException { service.submit(() -> { delayQueue.put(new MyDelayed("A-Task",5000)); delayQueue.put(new MyDelayed("B-Task",4000)); delayQueue.put(new MyDelayed("C-Task",3000)); delayQueue.put(new MyDelayed("D-Task",2000)); delayQueue.put(new MyDelayed("E-Task",1000)); }); while (true){ System.out.println(delayQueue.take()); } } }
result
應(yīng)用場景
1.美團(tuán)外賣訂單:當(dāng)我們下單后沒付款 ,30分鐘后將自動取消訂單
2.緩存,對于某些任務(wù),需要在特定的時間清理;
and so on
LinkedTransferQueue
當(dāng)消費(fèi)線程從隊(duì)列中取元素時,如果隊(duì)列為空,那么生成一個為null的節(jié)點(diǎn),消費(fèi)者線程就一直等待,此時如果生產(chǎn)者線程發(fā)現(xiàn)隊(duì)列中有一個null節(jié)點(diǎn), 它就不入隊(duì)了,而是將元素填充到這個null節(jié)點(diǎn)并喚醒消費(fèi)者線程,然后消費(fèi)者線程取走元素。
LinkedTransferQueue是 SynchronousQueue 和 LinkedBlockingQueue 的整合,性能比較高,因?yàn)闆]有鎖操作, SynchronousQueue不能存儲元素,而LinkedTransferQueue能存儲元素,
PriorityBlockingQueue
PriorityBlockingQueue是一個無界的阻塞隊(duì)列,同時是一個支持優(yōu)先級的隊(duì)列,讀寫操作都是基于ReentrantLock, 內(nèi)部使用堆算法保證每次出隊(duì)都是優(yōu)先級最高的元素
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }
到此這篇關(guān)于手把手帶你理解java線程池之工作隊(duì)列workQueue的文章就介紹到這了,更多相關(guān)java線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Arrays.AsList原理及用法實(shí)例
這篇文章主要介紹了Java Arrays.AsList原理及用法實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11SpringBoot 使用jwt進(jìn)行身份驗(yàn)證的方法示例
這篇文章主要介紹了SpringBoot 使用jwt進(jìn)行身份驗(yàn)證的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-12-12Java實(shí)現(xiàn)格式化打印慢SQL日志的方法詳解
不管我們使用何種語言開發(fā),一旦程序發(fā)生異常,日志是一個很重要的數(shù)據(jù),下面這篇文章主要給大家介紹了關(guān)于Java實(shí)現(xiàn)格式化打印慢SQL日志的相關(guān)資料,需要的朋友可以參考下2022-10-10springmvc級聯(lián)屬性處理無法轉(zhuǎn)換異常問題解決
這篇文章主要介紹了springmvc級聯(lián)屬性處理無法轉(zhuǎn)換異常問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12使用CXF和Jersey框架來進(jìn)行Java的WebService編程
這篇文章主要介紹了使用CXF和Jersey框架來進(jìn)行Java的WebService編程,Web service是一個平臺獨(dú)立的低耦合的自包含的基于可編程的web的應(yīng)用程序,需要的朋友可以參考下2015-12-12Java實(shí)現(xiàn)SHA1加密代碼實(shí)例
這篇文章給大家分享了Java實(shí)現(xiàn)SHA1加密的相關(guān)實(shí)例代碼,有興趣的朋友可以測試參考下。2018-07-07