盤(pán)點(diǎn)Java中延時(shí)任務(wù)的多種實(shí)現(xiàn)方式
場(chǎng)景描述
①需要實(shí)現(xiàn)一個(gè)定時(shí)發(fā)布系統(tǒng)通告的功能,如何實(shí)現(xiàn)? ②支付超時(shí),訂單自動(dòng)取消,如何實(shí)現(xiàn)?
實(shí)現(xiàn)方式
一、掛起線程
推薦指數(shù):★★☆ 優(yōu)點(diǎn): JDK原生(JUC包下)支持,無(wú)需引入新的依賴; 缺點(diǎn): (1)基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會(huì)導(dǎo)致任務(wù)丟失 (2)基于內(nèi)存掛起線程實(shí)現(xiàn)延時(shí),不支持集群 (3)代碼耦合性大,不易維護(hù) (4)一個(gè)任務(wù)就要新建一個(gè)線程綁定任務(wù)的執(zhí)行,容易造成資源浪費(fèi)
①配置延遲任務(wù)專(zhuān)用線程池
/**
* 線程池配置
*/
@Configuration
@EnableAsync
@EnableConfigurationProperties(ThreadPoolProperties.class)
public class ThreadPoolConfig {
//ThreadPoolProperties的配置依據(jù)需求和服務(wù)器配置自行配置
@Resource
private ThreadPoolProperties threadPoolProperties;
//延遲任務(wù)隊(duì)列容量
private final static int DELAY_TASK_QUEUE_CAPACITY = 100;
@Bean
public ThreadPoolTaskExecutor delayTaskExecutor() {
log.info("start delayTaskExecutor");
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
//配置最大線程數(shù)
threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
//配置隊(duì)列大小
threadPool.setQueueCapacity(DELAY_TASK_QUEUE_CAPACITY);
//線程最大存活時(shí)間
threadPool.setKeepAliveSeconds (threadPoolProperties.getKeepAliveSeconds());
//配置線程池中的線程的名稱前綴
threadPool.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候執(zhí)行的策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//執(zhí)行初始化
threadPool.initialize();
return threadPool;
}
}②創(chuàng)建延時(shí)任務(wù)
在需要執(zhí)行的代碼塊創(chuàng)建延時(shí)任務(wù)
delayTaskExecutor.execute(() -> {
try {
//線程掛起指定時(shí)間
TimeUnit.MINUTES.sleep(time);
//執(zhí)行業(yè)務(wù)邏輯
doSomething();
} catch (InterruptedException e) {
log.error("線程被打斷,執(zhí)行業(yè)務(wù)邏輯失敗");
}
});二、ScheduledExecutorService 延遲任務(wù)線程池
推薦指數(shù):★★★ 優(yōu)點(diǎn): 代碼簡(jiǎn)潔,JDK原生支持 缺點(diǎn): (1)基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會(huì)導(dǎo)致任務(wù)丟失 (2)基于內(nèi)存存放任務(wù),不支持集群 (3)一個(gè)任務(wù)就要新建一個(gè)線程綁定任務(wù)的執(zhí)行,容易造成資源浪費(fèi)
class Task implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
System.out.println("scheduledExecutorService====>>>延時(shí)器");
}
}
public class ScheduleServiceTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService=new ScheduledThreadPoolExecutor(10);
scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS);
scheduledExecutorService.schedule(new Task(),2, TimeUnit.SECONDS);
scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS);
}
}
三、DelayQueue(延時(shí)隊(duì)列)
推薦指數(shù):★★★☆ 優(yōu)點(diǎn): (1)JDK原生(JUC包下)支持,無(wú)需引入新的依賴; (2)可以用一個(gè)線程對(duì)整個(gè)延時(shí)隊(duì)列按序執(zhí)行; 缺點(diǎn): (1)基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會(huì)導(dǎo)致任務(wù)丟失 (2)基于內(nèi)存存放隊(duì)列,不支持集群 (3)依據(jù)compareTo方法排列隊(duì)列,調(diào)用take阻塞式的取出第一個(gè)任務(wù)(不調(diào)用則不取出),比較不靈活,會(huì)影響時(shí)間的準(zhǔn)確性
①新建一個(gè)延時(shí)任務(wù)
public class DelayTask implements Delayed {
private Integer taskId;
private long executeTime;
DelayTask(Integer taskId, long executeTime) {
this.taskId = taskId;
this.executeTime = executeTime;
}
/**
* 該任務(wù)的延時(shí)時(shí)長(zhǎng)
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return executeTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
DelayTask t = (DelayTask) o;
if (this.executeTime - t.executeTime <= 0) {
return -1;
} else {
return 1;
}
}
@Override
public String toString() {
return "延時(shí)任務(wù){(diào)" +
"任務(wù)編號(hào)=" + taskId +
", 執(zhí)行時(shí)間=" + new Date(executeTime) +
'}';
}
/**
* 執(zhí)行具體業(yè)務(wù)代碼
*/
public void doTask(){
System.out.println(this+":");
System.out.println("線程ID-"+Thread.currentThread().getId()+":線程名稱-"+Thread.currentThread().getName()+":do something!");
}
}②執(zhí)行延時(shí)任務(wù)
public class TestDelay {
public static void main(String[] args) throws InterruptedException {
// 新建3個(gè)任務(wù),并依次設(shè)置超時(shí)時(shí)間為 30s 10s 60s
DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 3000L);
DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 1000L);
DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 6000L);
DelayQueue<DelayTask> queue = new DelayQueue<>();
queue.add(d1);
queue.add(d2);
queue.add(d3);
System.out.println("開(kāi)啟延時(shí)隊(duì)列時(shí)間:" + new Date()+"\n");
// 從延時(shí)隊(duì)列中獲取元素
while (!queue.isEmpty()) {
queue.take().doTask();
}
System.out.println("\n任務(wù)結(jié)束");
}
}執(zhí)行結(jié)果:

四、Redis-為key指定超時(shí)時(shí)長(zhǎng),并監(jiān)聽(tīng)失效key
推薦指數(shù):★★★☆ 優(yōu)點(diǎn): 對(duì)于有依賴redis的業(yè)務(wù)且有延時(shí)任務(wù)的需求,能夠快速對(duì)接 缺點(diǎn): (1)客戶端斷開(kāi)后重連會(huì)導(dǎo)致所有事件丟失 (2)高并發(fā)場(chǎng)景下,存在大量的失效key場(chǎng)景會(huì)導(dǎo)出失效時(shí)間存在延遲 (3)若有多個(gè)監(jiān)聽(tīng)器監(jiān)聽(tīng)該key,是會(huì)重復(fù)消費(fèi)這個(gè)過(guò)期事件的,需要特定邏輯判斷
① 修改Redis配置文件并重啟Redis
notify-keyspace-events Ex
注意: redis配置文件不能有空格,否則會(huì)啟動(dòng)報(bào)錯(cuò)

②Java中關(guān)于Redis的配置類(lèi)
redisTemplate實(shí)例bean需要自定義生成; RedisMessageListenerContainer 是redis-key過(guò)期監(jiān)聽(tīng)需要的監(jiān)聽(tīng)器容器;
@Configuration
@Slf4j
public class RedisConfiguration {
/**
* Redis配置
* @param factory
* @return
*/
@Bean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(redisSerializer);
//value hashmap序列化
template.setHashValueSerializer(redisSerializer);
//key hashmap序列化
template.setHashKeySerializer(redisSerializer);
return template;
}
/**
* 消息監(jiān)聽(tīng)器容器bean
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}③監(jiān)聽(tīng)器代碼
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
private static final String TEST_REDIS_KEY = "testExpired";
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer,
RedisTemplate redisTemplate) {
super(listenerContainer);
/**
* 設(shè)置一個(gè)Redis延遲過(guò)期key(key名:testExpired,過(guò)期時(shí)間:30秒)
*/
redisTemplate.opsForValue().set(TEST_REDIS_KEY, "1", 20, TimeUnit.SECONDS);
log.info("設(shè)置redis-key");
}
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String expiredKey = message.toString();
if (TEST_REDIS_KEY.equals(expiredKey)) {
//業(yè)務(wù)處理
log.info(expiredKey + "過(guò)期,觸發(fā)回調(diào)");
}
} catch (Exception e) {
log.error("key 過(guò)期通知處理異常,{}", e);
}
}
}測(cè)試結(jié)果:

五、時(shí)間輪
推薦指數(shù):★★★★ 優(yōu)點(diǎn): (1)對(duì)于大量定時(shí)任務(wù),時(shí)間輪可以僅用一個(gè)工作線程對(duì)編排的任務(wù)進(jìn)行順序運(yùn)行; (2)自動(dòng)運(yùn)行,可以自定義時(shí)間輪每輪的tick數(shù),tick間隔,靈活且時(shí)間精度可控 缺點(diǎn): (1)基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會(huì)導(dǎo)致任務(wù)丟失 (2)基于內(nèi)存存放任務(wù),不支持集群
public class WheelTimerTest {
public static void main(String[] args) {
//設(shè)置每個(gè)格子是 100ms, 總共 256 個(gè)格子
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);
//加入三個(gè)任務(wù),依次設(shè)置超時(shí)時(shí)間是 10s 5s 20s
System.out.println("加入一個(gè)任務(wù),ID = 1, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println(Thread.currentThread().getName());
System.out.println("執(zhí)行一個(gè)任務(wù),ID = 1, time= " + LocalDateTime.now());
}, 10, TimeUnit.SECONDS);
System.out.println("加入一個(gè)任務(wù),ID = 2, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println(Thread.currentThread().getName());
System.out.println("執(zhí)行一個(gè)任務(wù),ID = 2, time= " + LocalDateTime.now());
}, 5, TimeUnit.SECONDS);
System.out.println("加入一個(gè)任務(wù),ID = 3, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println(Thread.currentThread().getName());
System.out.println("執(zhí)行一個(gè)任務(wù),ID = 3, time= " + LocalDateTime.now());
}, 20, TimeUnit.SECONDS);
System.out.println("加入一個(gè)任務(wù),ID = 4, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println(Thread.currentThread().getName());
System.out.println("執(zhí)行一個(gè)任務(wù),ID = 4, time= " + LocalDateTime.now());
}, 20, TimeUnit.SECONDS);
System.out.println("等待任務(wù)執(zhí)行===========");
}
}
六、消息隊(duì)列-延遲隊(duì)列
針對(duì)任務(wù)丟失的代價(jià)過(guò)大,高并發(fā)的場(chǎng)景 推薦指數(shù):★★★★ 優(yōu)點(diǎn): 支持集群,分布式,高并發(fā)場(chǎng)景; 缺點(diǎn): 引入額外的消息隊(duì)列,增加項(xiàng)目的部署和維護(hù)的復(fù)雜度。
場(chǎng)景:為一個(gè)委托指定期限,委托到期后,委托關(guān)系終止,相關(guān)業(yè)務(wù)權(quán)限移交回原擁有者 這里采用的是RabbitMq的死信隊(duì)列加TTL消息轉(zhuǎn)化為延遲隊(duì)列的方式(RabbitMq沒(méi)有延時(shí)隊(duì)列)
①聲明一個(gè)隊(duì)列設(shè)定其的死信隊(duì)列
@Configuration
public class MqConfig {
public static final String GLOBAL_RABBIT_TEMPLATE = "rabbitTemplateGlobal";
public static final String DLX_EXCHANGE_NAME = "dlxExchange";
public static final String AUTH_EXCHANGE_NAME = "authExchange";
public static final String DLX_QUEUE_NAME = "dlxQueue";
public static final String AUTH_QUEUE_NAME = "authQueue";
public static final String DLX_AUTH_QUEUE_NAME = "dlxAuthQueue";
@Bean
@Qualifier(GLOBAL_RABBIT_TEMPLATE)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
@Qualifier(AUTH_EXCHANGE_NAME)
public Exchange authExchange() {
return ExchangeBuilder.directExchange (AUTH_EXCHANGE_NAME).durable (true).build ();
}
/**
* 死信交換機(jī)
* @return
*/
@Bean
@Qualifier(DLX_EXCHANGE_NAME)
public Exchange dlxExchange() {
return ExchangeBuilder.directExchange (DLX_EXCHANGE_NAME).durable (true).build ();
}
/**
* 記錄日志的死信隊(duì)列
* @return
*/
@Bean
@Qualifier(DLX_QUEUE_NAME)
public Queue dlxQueue() {
// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
return QueueBuilder.durable (DLX_QUEUE_NAME).build ();
}
/**
* 委托授權(quán)專(zhuān)用隊(duì)列
* @return
*/
@Bean
@Qualifier(AUTH_QUEUE_NAME)
public Queue authQueue() {
return QueueBuilder
.durable (AUTH_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", "dlx_auth")
.build ();
}
/**
* 委托授權(quán)專(zhuān)用死信隊(duì)列
* @return
*/
@Bean
@Qualifier(DLX_AUTH_QUEUE_NAME)
public Queue dlxAuthQueue() {
// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
return QueueBuilder
.durable (DLX_AUTH_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", "dlx_key")
.build ();
}
@Bean
public Binding bindDlxQueueExchange(@Qualifier(DLX_QUEUE_NAME) Queue dlxQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
return BindingBuilder.bind (dlxQueue).to (dlxExchange).with ("dlx_key").noargs ();
}
/**
* 委托授權(quán)專(zhuān)用死信隊(duì)列綁定關(guān)系
* @param dlxAuthQueue
* @param dlxExchange
* @return
*/
@Bean
public Binding bindDlxAuthQueueExchange(@Qualifier(DLX_AUTH_QUEUE_NAME) Queue dlxAuthQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
return BindingBuilder.bind (dlxAuthQueue).to (dlxExchange).with ("dlx_auth").noargs ();
}
/**
* 委托授權(quán)專(zhuān)用隊(duì)列綁定關(guān)系
* @param authQueue
* @param authExchange
* @return
*/
@Bean
public Binding bindAuthQueueExchange(@Qualifier(AUTH_QUEUE_NAME) Queue authQueue, @Qualifier(AUTH_EXCHANGE_NAME) Exchange authExchange){
return BindingBuilder.bind (authQueue).to (authExchange).with ("auth").noargs ();
}
}
②發(fā)送含過(guò)期時(shí)間的消息
向授權(quán)交換機(jī),發(fā)送路由為"auth"的消息(指定了業(yè)務(wù)所需的超時(shí)時(shí)間) =》發(fā)向MqConfig.AUTH_QUEUE_NAME 隊(duì)列
rabbitTemplate.convertAndSend(MqConfig.AUTH_EXCHANGE_NAME, "auth", "類(lèi)型:END,信息:{id:1,fromUserId:111,toUserId:222,beginData:20201204,endData:20211104}", message -> {
/**
* MessagePostProcessor:消息后置處理
* 為消息設(shè)置屬性,然后返回消息,相當(dāng)于包裝消息的類(lèi)
*/
//業(yè)務(wù)邏輯:過(guò)期時(shí)間=xxxx
String ttl = "5000";
//設(shè)置消息的過(guò)期時(shí)間
message.getMessageProperties ().setExpiration (ttl);
return message;
});③超時(shí)后隊(duì)列MqConfig.AUTH_QUEUE_NAME會(huì)將消息轉(zhuǎn)發(fā)至其配置的死信路由"dlx_auth",監(jiān)聽(tīng)該死信隊(duì)列即可消費(fèi)定時(shí)的消息
/**
* 授權(quán)定時(shí)處理
* @param channel
* @param message
*/
@RabbitListener(queues = MqConfig.DLX_AUTH_QUEUE_NAME)
public void dlxAuthQ(Channel channel, Message message) throws IOException {
System.out.println ("\n死信原因:" + message.getMessageProperties ().getHeaders ().get ("x-first-death-reason"));
//1.判斷消息類(lèi)型:1.BEGIN 2.END
try {
//2.1 類(lèi)型為授權(quán)到期(END)
//2.1.1 修改報(bào)件辦理人
//2.1.2 修改授權(quán)狀態(tài)為0(失效)
//2.2 類(lèi)型為授權(quán)開(kāi)啟(BEGIN)
//2.2.1 修改授權(quán)狀態(tài)為1(開(kāi)啟)
System.out.println (new String(message.getBody (), Charset.forName ("utf8")));
channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
System.out.println ("已處理,授權(quán)相關(guān)信息修改成功");
} catch (Exception e) {
//拒簽消息
channel.basicNack (message.getMessageProperties ().getDeliveryTag (), false, false);
System.out.println ("授權(quán)相關(guān)信息處理失敗, 進(jìn)入死信隊(duì)列記錄日志");
}
}以上就是盤(pán)點(diǎn)Java中延時(shí)任務(wù)的多種實(shí)現(xiàn)方式的詳細(xì)內(nèi)容,更多關(guān)于Java延時(shí)任務(wù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java?延時(shí)隊(duì)列及簡(jiǎn)單使用方式詳解
- Java延時(shí)的3種實(shí)現(xiàn)方法舉例
- 一文帶你深入了解Java中延時(shí)任務(wù)的實(shí)現(xiàn)
- Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的場(chǎng)景
- Java處理延時(shí)任務(wù)的常用幾種解決方案
- 詳解Java中的延時(shí)隊(duì)列 DelayQueue
- 一口氣說(shuō)出Java 6種延時(shí)隊(duì)列的實(shí)現(xiàn)方法(面試官也得服)
- Java延時(shí)執(zhí)行的三種實(shí)現(xiàn)方式
相關(guān)文章
Java transient關(guān)鍵字與序列化操作實(shí)例詳解
這篇文章主要介紹了Java transient關(guān)鍵字與序列化操作,結(jié)合實(shí)例形式詳細(xì)分析了java序列化操作相關(guān)實(shí)現(xiàn)方法與操作注意事項(xiàng),需要的朋友可以參考下2019-09-09
使用java swing實(shí)現(xiàn)qq登錄界面示例分享
這篇文章主要介紹了使用java swing實(shí)現(xiàn)qq登錄界面示例,需要的朋友可以參考下2014-04-04
windows命令行中java和javac、javap使用詳解(java編譯命令)
最近重新復(fù)習(xí)了一下java基礎(chǔ),這里便講講對(duì)于一個(gè)類(lèi)文件如何編譯、運(yùn)行、反編譯的。也讓自己加深一下印象2014-03-03
關(guān)于Java實(shí)體類(lèi)Serializable序列化接口的作用和必要性解析
序列化是將對(duì)象狀態(tài)轉(zhuǎn)化為可保持或者傳輸?shù)母袷竭^(guò)程,與序列化相反的是反序列化,完成序列化和反序列化,可以存儲(chǔ)或傳輸數(shù)據(jù),一般情況下,在定義實(shí)體類(lèi)時(shí)會(huì)使用Serializable,需要的朋友可以參考下2023-05-05
mybatis-config.xml文件中的mappers標(biāo)簽使用
在MyBatis配置中,<mapper>標(biāo)簽關(guān)鍵用于指定SQL?Mapper的XML文件路徑,主要有三種指定方式:resource、url和class,Resource方式從類(lèi)的根路徑開(kāi)始,適合放在項(xiàng)目?jī)?nèi)部保障移植性,URL方式指定絕對(duì)路徑,移植性差,適用于外部路徑2024-10-10
springboot讀取自定義配置文件時(shí)出現(xiàn)亂碼解決方案
這篇文章主要介紹了springboot讀取自定義配置文件時(shí)出現(xiàn)亂碼解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11

