JVM優(yōu)先級(jí)線程池做任務(wù)隊(duì)列的實(shí)現(xiàn)方法
前言
我們都知道 web 服務(wù)的工作大多是接受 http 請(qǐng)求,并返回處理后的結(jié)果。服務(wù)器接受的每一個(gè)請(qǐng)求又可以看是一個(gè)任務(wù)。一般而言這些請(qǐng)求任務(wù)會(huì)根據(jù)請(qǐng)求的先后有序處理,如果請(qǐng)求任務(wù)的處理比較耗時(shí),往往就需要排隊(duì)了。而同時(shí)不同的任務(wù)直接可能會(huì)存在一些優(yōu)先級(jí)的變化,這時(shí)候就需要引入任務(wù)隊(duì)列并進(jìn)行管理了??梢宰鋈蝿?wù)隊(duì)列的東西有很多,Java 自帶的線程池,以及其他的消息中間件都可以。
同步與異步
這個(gè)問題在之前已經(jīng)提過很多次了,有些任務(wù)是需要請(qǐng)求后立即返回結(jié)果的,而有的則不需要。設(shè)想一下你下單購物的場景,付完錢后,系統(tǒng)只需要返回一個(gè)支付成功即可,后續(xù)的積分增加、優(yōu)惠券發(fā)放、安排發(fā)貨等等業(yè)務(wù)都不需要實(shí)時(shí)返回給用戶的,這些就是異步的任務(wù)。大量的異步任務(wù)到達(dá)我們部署的服務(wù)上,由于處理效率的瓶頸,無法達(dá)到實(shí)時(shí)處理,因此與需要用隊(duì)列將他們暫時(shí)保存起來,排隊(duì)處理。
線程池
在 Java 中提到隊(duì)列,我們除了想到基本的數(shù)據(jù)結(jié)構(gòu)之外,應(yīng)該還有線程池。線程池自帶一套機(jī)制可以實(shí)現(xiàn)任務(wù)的排隊(duì)和執(zhí)行,可以滿足單點(diǎn)環(huán)境下絕大多數(shù)異步化的場景。下面是典型的一個(gè)處理流程:
// 注入合適類型的線程池
@Autowired
private final ThreadPoolExecutor asyncPool;
@RequestMapping(value = "/async/someOperate", method = RequestMethod.POST)
public RestResult someOperate(HttpServletRequest request, String params,String callbackUrl {
// 接受請(qǐng)求后 submit 到線程池排隊(duì)處理
asyncPool.submit(new Task(params,callbackUrl);
return new RestResult(ResultCode.SUCCESS.getCode(), null) {{
setMsg("successful!" + prop.getShowMsg());
}};
}
// 異步任務(wù)處理
@Slf4j
public class Task extends Callable<RestResult> {
private String params;
private String callbackUrl;
private final IAlgorithmService algorithmService = SpringUtil.getBean(IAlgorithmServiceImpl.class);
private final ServiceUtils serviceUtils = SpringUtil.getBean(ServiceUtils.class);
public ImageTask(String params,String callbackUrl) {
this.params = params;
this.callbackUrl = callbackUrl;
}
@Override
public RestResult call() {
try {
// 業(yè)務(wù)處理
CarDamageResult result = algorithmService.someOperate(this.params);
// 回調(diào)
return serviceUtils.callback(this.callbackUrl, this.caseNum, ResultCode.SUCCESS.getCode(), result, this.isAsync);
} catch (ServiceException e) {
return serviceUtils.callback(this.callbackUrl, this.caseNum, e.getCode(), null, this.isAsync);
}
}
}
對(duì)于線程池這里就不具體展開講了,僅僅簡單理了下具體的流程:
- 收到請(qǐng)求后,參數(shù)校驗(yàn)后傳入線程池排隊(duì)。
- 返回結(jié)果:“請(qǐng)求成功,正在處理”。
- 任務(wù)排到后由相應(yīng)的線程處理,處理完后進(jìn)行接口回調(diào)。
上面的例子描述了一個(gè)生產(chǎn)速度遠(yuǎn)遠(yuǎn)大于消費(fèi)速度的模型,普通面向數(shù)據(jù)庫開發(fā)的企業(yè)級(jí)應(yīng)用,由于數(shù)據(jù)庫的連接池開發(fā)的連接數(shù)較大,一般不需要這樣通過線程池來處理,而一些 GPU 密集型的應(yīng)用場景,由于顯存的瓶頸導(dǎo)致消費(fèi)速度慢時(shí),就需要隊(duì)列來作出調(diào)整了。
帶優(yōu)先級(jí)的線程池
更復(fù)雜的,例如考慮到任務(wù)的優(yōu)先級(jí),還需要對(duì)線程池進(jìn)行重寫,通過 PriorityBlockingQueue 來替換默認(rèn)的阻塞隊(duì)列。直接上代碼。
import lombok.Data;
import java.util.concurrent.Callable;
/**
* @author Fururur
* @create 2020-01-14-10:37
*/
@Data
public abstract class PriorityCallable<T> implements Callable<T> {
private int priority;
}
import lombok.Getter;
import lombok.Setter;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* 優(yōu)先級(jí)線程池的實(shí)現(xiàn)
*
* @author Fururur
* @create 2019-07-23-10:19
*/
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
private ThreadLocal<Integer> local = ThreadLocal.withInitial(() -> 0);
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue());
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), handler);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory, handler);
}
private static PriorityBlockingQueue getWorkQueue() {
return new PriorityBlockingQueue();
}
@Override
public void execute(Runnable command) {
int priority = local.get();
try {
this.execute(command, priority);
} finally {
local.set(0);
}
}
public void execute(Runnable command, int priority) {
super.execute(new PriorityRunnable(command, priority));
}
public <T> Future<T> submit(PriorityCallable<T> task) {
local.set(task.getPriority());
return super.submit(task);
}
public <T> Future<T> submit(Runnable task, T result, int priority) {
local.set(priority);
return super.submit(task, result);
}
public Future<?> submit(Runnable task, int priority) {
local.set(priority);
return super.submit(task);
}
@Getter
@Setter
protected static class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
private final static AtomicLong seq = new AtomicLong();
private final long seqNum;
private Runnable run;
private int priority;
PriorityRunnable(Runnable run, int priority) {
seqNum = seq.getAndIncrement();
this.run = run;
this.priority = priority;
}
@Override
public void run() {
this.run.run();
}
@Override
public int compareTo(PriorityRunnable other) {
int res = 0;
if (this.priority == other.priority) {
if (other.run != this.run) {
// ASC
res = (seqNum < other.seqNum ? -1 : 1);
}
} else {
// DESC
res = this.priority > other.priority ? -1 : 1;
}
return res;
}
}
}
要點(diǎn)如下:
- 替換線程池默認(rèn)的阻塞隊(duì)列為 PriorityBlockingQueue,響應(yīng)的傳入的線程類需要實(shí)現(xiàn) Comparable<T> 才能進(jìn)行比較。
- PriorityBlockingQueue 的數(shù)據(jù)結(jié)構(gòu)決定了,優(yōu)先級(jí)相同的任務(wù)無法保證 FIFO,需要自己控制順序。
- 需要重寫線程池的 execute() 方法??催^線程池源碼的會(huì)發(fā)現(xiàn),執(zhí)行 submit(task) 方法后,都會(huì)轉(zhuǎn)化成 RunnableFuture<T> 再進(jìn)一步執(zhí)行,由于傳入的 task 雖然實(shí)現(xiàn)了 Comparable<T> 到,但是內(nèi)部轉(zhuǎn)換成的 RunnableFuture<T> 并未實(shí)現(xiàn),因此直接 submit 會(huì)拋出 Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable 這樣一個(gè)異常,所以需要重寫 execute() 方法,構(gòu)造一個(gè) PriorityRunnable 作為中轉(zhuǎn)。
總結(jié)
JVM 線程池是實(shí)現(xiàn)異步任務(wù)隊(duì)列最簡單最原生的一種方式,本文介紹了基本的使用流程和帶有優(yōu)先隊(duì)列需求的用法。這種方法可有滿足到一些簡單的業(yè)務(wù)場景,但也存在一定的局限性:
- JVM 線程池是單機(jī)的,橫向擴(kuò)展多個(gè)服務(wù)下做負(fù)載均衡時(shí),就會(huì)存在多個(gè)線程池了他們是分開工作的,無法很好的統(tǒng)一和管理,不太適合分布式場景。
- JVM 線程池是基于內(nèi)存的,一旦服務(wù)掛了,會(huì)出現(xiàn)任務(wù)丟失的情況,可靠性低。
- 缺少作為任務(wù)隊(duì)列的 ack 機(jī)制,一旦任務(wù)失敗不會(huì)重新執(zhí)行,且無法很好地對(duì)線程池隊(duì)列進(jìn)行監(jiān)控。
顯然簡單的 JVM 線程池是無法 handle 到負(fù)載的業(yè)務(wù)場景的,這就需要引入其他中間件了,在接下來的文章中我們會(huì)繼續(xù)探討。
參考文獻(xiàn)
ThreadPoolExecutor 優(yōu)先級(jí)的線程池
implementing PriorityQueue on ThreadPoolExecutor
ThreadPoolExecutor 的 PriorityBlockingQueue 類型轉(zhuǎn)化問題
大搜車異步任務(wù)隊(duì)列中間件的建設(shè)實(shí)踐
到此這篇關(guān)于JVM優(yōu)先級(jí)線程池做任務(wù)隊(duì)列的實(shí)現(xiàn)方法的文章就介紹到這了,更多相關(guān)java線程池優(yōu)先級(jí)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決常見的Eclipse SVN插件報(bào)錯(cuò)方法詳解
本篇文章是對(duì)常見的Eclipse SVN插件報(bào)錯(cuò)方法進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05
springboot整合JSR303參數(shù)校驗(yàn)與全局異常處理的方法
JSR-303 是 JAVA EE 6 中的一項(xiàng)子規(guī)范,叫做 Bean Validation,官方參考實(shí)現(xiàn)是Hibernate Validator,這篇文章主要介紹了springboot整合JSR303參數(shù)校驗(yàn)與全局異常處理,需要的朋友可以參考下2022-09-09
SpringBoot如何使用Undertow做服務(wù)器
這篇文章主要介紹了SpringBoot如何使用Undertow做服務(wù)器,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
Spring Mvc中傳遞參數(shù)方法之url/requestMapping詳解
在開發(fā)中,參數(shù)傳遞是必不可少的一個(gè)功能,下面這篇文章主要給大家介紹了關(guān)于Spring Mvc中傳遞參數(shù)方法之url/requestMapping的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。2017-07-07
java如何獲取request中json數(shù)據(jù)
這篇文章主要給大家介紹了關(guān)于java如何獲取request中json數(shù)據(jù)的相關(guān)資料,文中通過代碼示例以及圖文將獲取的方法介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
Java數(shù)據(jù)結(jié)構(gòu)之圖的路徑查找算法詳解
這篇文章主要為大家詳細(xì)介紹了java數(shù)據(jù)結(jié)構(gòu)中圖的路徑查找算法,文中的示例代碼講解詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-11-11

