手把手帶你理解java線程池之工作隊(duì)列workQueue
線程池之工作隊(duì)列

ArrayBlockingQueue
采用數(shù)組來實(shí)現(xiàn),并采用可重入鎖ReentrantLock來做并發(fā)控制,無論是添加還是讀取,都先要獲得鎖才能進(jìn)行操作 可看出進(jìn)行讀寫操作都使用了ReentrantLock,ArrayBlockingQueue需要為其指定容量
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
SynchronousQueue
由于SynchronousQueue源碼比較復(fù)雜,里面大量的Cas操作,SynchronousQueue沒有容器,所以里面是裝不了任務(wù)的,當(dāng)一個生產(chǎn)者線程生產(chǎn)一個任務(wù)的 時(shí)候,如果沒有對應(yīng)的消費(fèi)者消費(fèi),那么該生產(chǎn)者會一直阻塞,知道有消費(fèi)者消費(fèi)為止。
圖示:

如下代碼,如果我們將消費(fèi)者線程注釋掉執(zhí)行,那么生產(chǎn)者哪里將會一直阻塞
package thread.customthreadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 測試SynchronousQueue
*/
public class SynchronousQueueTest {
private static final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
private static final ExecutorService service = Executors.newCachedThreadPool();
public static void main(String[] args) {
/**
* Provider
*/
service.submit(() -> {
try {
synchronousQueue.put("liu");
}catch (Exception e){
e.printStackTrace();
}
System.out.println("Consumer finished spending");
});
/**
* Consumer
*/
service.submit(() ->{
try {
synchronousQueue.take();
}catch (Exception e){
e.printStackTrace();
}
System.out.println("take over");
});
}
}
LinkedBlockingDeque
LinkedBlockingDeque是一個雙向隊(duì)列,底層使用單鏈表實(shí)現(xiàn),任何一段都可進(jìn)行元素的讀寫操作,在初始化LinkedBlockingDeque的時(shí)候, 我們可以指定容量,也可不指定,如果不指定,則容量為Integer.MAX_VALUE,
注:Deque是雙端隊(duì)列,而Queue是單端隊(duì)列,雙端意思是兩端都可以進(jìn)行讀寫操作,而單端則只能從一端進(jìn),一端出(FIFO)
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
package thread.customthreadpool;
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingDequeTest {
private static final LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>();
public static void main(String[] args) throws InterruptedException {
deque.put(1);
deque.put(2);
deque.put(3);
deque.put(4);
deque.put(5);
System.out.println(deque);
System.out.println("deque size "+deque.size());
deque.take();
deque.take();
deque.take();
deque.take();
deque.take();
System.out.println(deque);
System.out.println("deque size "+deque.size());
}
}

LinkedBlockingQueue
底層基于單向連表實(shí)現(xiàn),是一個單向隊(duì)列,具有先進(jìn)先出(FIFO)特點(diǎn),使用了ReentrantLock來做并發(fā)控制,讀寫操作都上鎖
private final ReentrantLock putLock = new ReentrantLock();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
DelayDeque
DelayDeque是一個無界隊(duì)列,添加進(jìn)DelayDeque的元素會經(jīng)過compareTo方法計(jì)算,然后按照時(shí)間 進(jìn)行排序,排在隊(duì)頭的元素是最早到期的,越往后到期時(shí)間越長,DelayDeque只能接受Delayed接口類型 如圖所示,隊(duì)列里的元素并不是按照先進(jìn)先出的規(guī)則,而是按照過期時(shí)間

示例
package thread.customthreadpool.delayDeque;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class MyDelayed implements Delayed {
private final String taskName ;
private final long nowTime = System.currentTimeMillis();
private final long expireTime ;
public MyDelayed(String taskName,long expireTime) {
this.taskName = taskName;
this.expireTime = expireTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((nowTime+expireTime) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
MyDelayed myDelayed = (MyDelayed) o;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "MyDelayed{" +
"taskName='" + taskName + '\'' +
", nowTime=" + nowTime +
", expireTime=" + expireTime +
'}';
}
}
package thread.customthreadpool.delayDeque;
import java.util.concurrent.*;
public class MyDelayQueue {
private static final DelayQueue<MyDelayed> delayQueue = new DelayQueue<>();
private static final ExecutorService service = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
service.submit(() -> {
delayQueue.put(new MyDelayed("A-Task",5000));
delayQueue.put(new MyDelayed("B-Task",4000));
delayQueue.put(new MyDelayed("C-Task",3000));
delayQueue.put(new MyDelayed("D-Task",2000));
delayQueue.put(new MyDelayed("E-Task",1000));
});
while (true){
System.out.println(delayQueue.take());
}
}
}
result

應(yīng)用場景
1.美團(tuán)外賣訂單:當(dāng)我們下單后沒付款 ,30分鐘后將自動取消訂單
2.緩存,對于某些任務(wù),需要在特定的時(shí)間清理;
and so on
LinkedTransferQueue
當(dāng)消費(fèi)線程從隊(duì)列中取元素時(shí),如果隊(duì)列為空,那么生成一個為null的節(jié)點(diǎn),消費(fèi)者線程就一直等待,此時(shí)如果生產(chǎn)者線程發(fā)現(xiàn)隊(duì)列中有一個null節(jié)點(diǎn), 它就不入隊(duì)了,而是將元素填充到這個null節(jié)點(diǎn)并喚醒消費(fèi)者線程,然后消費(fèi)者線程取走元素。
LinkedTransferQueue是 SynchronousQueue 和 LinkedBlockingQueue 的整合,性能比較高,因?yàn)闆]有鎖操作, SynchronousQueue不能存儲元素,而LinkedTransferQueue能存儲元素,
PriorityBlockingQueue
PriorityBlockingQueue是一個無界的阻塞隊(duì)列,同時(shí)是一個支持優(yōu)先級的隊(duì)列,讀寫操作都是基于ReentrantLock, 內(nèi)部使用堆算法保證每次出隊(duì)都是優(yōu)先級最高的元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
到此這篇關(guān)于手把手帶你理解java線程池之工作隊(duì)列workQueue的文章就介紹到這了,更多相關(guān)java線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Arrays.AsList原理及用法實(shí)例
這篇文章主要介紹了Java Arrays.AsList原理及用法實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11
SpringBoot 使用jwt進(jìn)行身份驗(yàn)證的方法示例
這篇文章主要介紹了SpringBoot 使用jwt進(jìn)行身份驗(yàn)證的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-12-12
Java實(shí)現(xiàn)格式化打印慢SQL日志的方法詳解
不管我們使用何種語言開發(fā),一旦程序發(fā)生異常,日志是一個很重要的數(shù)據(jù),下面這篇文章主要給大家介紹了關(guān)于Java實(shí)現(xiàn)格式化打印慢SQL日志的相關(guān)資料,需要的朋友可以參考下2022-10-10
springmvc級聯(lián)屬性處理無法轉(zhuǎn)換異常問題解決
這篇文章主要介紹了springmvc級聯(lián)屬性處理無法轉(zhuǎn)換異常問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
使用CXF和Jersey框架來進(jìn)行Java的WebService編程
這篇文章主要介紹了使用CXF和Jersey框架來進(jìn)行Java的WebService編程,Web service是一個平臺獨(dú)立的低耦合的自包含的基于可編程的web的應(yīng)用程序,需要的朋友可以參考下2015-12-12
Java實(shí)現(xiàn)SHA1加密代碼實(shí)例
這篇文章給大家分享了Java實(shí)現(xiàn)SHA1加密的相關(guān)實(shí)例代碼,有興趣的朋友可以測試參考下。2018-07-07

