盤點(diǎn)Java中延時任務(wù)的多種實(shí)現(xiàn)方式
場景描述
①需要實(shí)現(xiàn)一個定時發(fā)布系統(tǒng)通告的功能,如何實(shí)現(xiàn)? ②支付超時,訂單自動取消,如何實(shí)現(xiàn)?
實(shí)現(xiàn)方式
一、掛起線程
推薦指數(shù):★★☆ 優(yōu)點(diǎn): JDK原生(JUC包下)支持,無需引入新的依賴; 缺點(diǎn): (1)基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會導(dǎo)致任務(wù)丟失 (2)基于內(nèi)存掛起線程實(shí)現(xiàn)延時,不支持集群 (3)代碼耦合性大,不易維護(hù) (4)一個任務(wù)就要新建一個線程綁定任務(wù)的執(zhí)行,容易造成資源浪費(fèi)
①配置延遲任務(wù)專用線程池
/** * 線程池配置 */ @Configuration @EnableAsync @EnableConfigurationProperties(ThreadPoolProperties.class) public class ThreadPoolConfig { //ThreadPoolProperties的配置依據(jù)需求和服務(wù)器配置自行配置 @Resource private ThreadPoolProperties threadPoolProperties; //延遲任務(wù)隊(duì)列容量 private final static int DELAY_TASK_QUEUE_CAPACITY = 100; @Bean public ThreadPoolTaskExecutor delayTaskExecutor() { log.info("start delayTaskExecutor"); ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); //配置核心線程數(shù) threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize()); //配置最大線程數(shù) threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize()); //配置隊(duì)列大小 threadPool.setQueueCapacity(DELAY_TASK_QUEUE_CAPACITY); //線程最大存活時間 threadPool.setKeepAliveSeconds (threadPoolProperties.getKeepAliveSeconds()); //配置線程池中的線程的名稱前綴 threadPool.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix()); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時候執(zhí)行的策略 threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //執(zhí)行初始化 threadPool.initialize(); return threadPool; } }
②創(chuàng)建延時任務(wù)
在需要執(zhí)行的代碼塊創(chuàng)建延時任務(wù)
delayTaskExecutor.execute(() -> { try { //線程掛起指定時間 TimeUnit.MINUTES.sleep(time); //執(zhí)行業(yè)務(wù)邏輯 doSomething(); } catch (InterruptedException e) { log.error("線程被打斷,執(zhí)行業(yè)務(wù)邏輯失敗"); } });
二、ScheduledExecutorService 延遲任務(wù)線程池
推薦指數(shù):★★★ 優(yōu)點(diǎn): 代碼簡潔,JDK原生支持 缺點(diǎn): (1)基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會導(dǎo)致任務(wù)丟失 (2)基于內(nèi)存存放任務(wù),不支持集群 (3)一個任務(wù)就要新建一個線程綁定任務(wù)的執(zhí)行,容易造成資源浪費(fèi)
class Task implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName()); System.out.println("scheduledExecutorService====>>>延時器"); } } public class ScheduleServiceTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService=new ScheduledThreadPoolExecutor(10); scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS); scheduledExecutorService.schedule(new Task(),2, TimeUnit.SECONDS); scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS); } }
三、DelayQueue(延時隊(duì)列)
推薦指數(shù):★★★☆ 優(yōu)點(diǎn): (1)JDK原生(JUC包下)支持,無需引入新的依賴; (2)可以用一個線程對整個延時隊(duì)列按序執(zhí)行; 缺點(diǎn): (1)基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會導(dǎo)致任務(wù)丟失 (2)基于內(nèi)存存放隊(duì)列,不支持集群 (3)依據(jù)compareTo方法排列隊(duì)列,調(diào)用take阻塞式的取出第一個任務(wù)(不調(diào)用則不取出),比較不靈活,會影響時間的準(zhǔn)確性
①新建一個延時任務(wù)
public class DelayTask implements Delayed { private Integer taskId; private long executeTime; DelayTask(Integer taskId, long executeTime) { this.taskId = taskId; this.executeTime = executeTime; } /** * 該任務(wù)的延時時長 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return executeTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { DelayTask t = (DelayTask) o; if (this.executeTime - t.executeTime <= 0) { return -1; } else { return 1; } } @Override public String toString() { return "延時任務(wù){(diào)" + "任務(wù)編號=" + taskId + ", 執(zhí)行時間=" + new Date(executeTime) + '}'; } /** * 執(zhí)行具體業(yè)務(wù)代碼 */ public void doTask(){ System.out.println(this+":"); System.out.println("線程ID-"+Thread.currentThread().getId()+":線程名稱-"+Thread.currentThread().getName()+":do something!"); } }
②執(zhí)行延時任務(wù)
public class TestDelay { public static void main(String[] args) throws InterruptedException { // 新建3個任務(wù),并依次設(shè)置超時時間為 30s 10s 60s DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 3000L); DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 1000L); DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 6000L); DelayQueue<DelayTask> queue = new DelayQueue<>(); queue.add(d1); queue.add(d2); queue.add(d3); System.out.println("開啟延時隊(duì)列時間:" + new Date()+"\n"); // 從延時隊(duì)列中獲取元素 while (!queue.isEmpty()) { queue.take().doTask(); } System.out.println("\n任務(wù)結(jié)束"); } }
執(zhí)行結(jié)果:
四、Redis-為key指定超時時長,并監(jiān)聽失效key
推薦指數(shù):★★★☆ 優(yōu)點(diǎn): 對于有依賴redis的業(yè)務(wù)且有延時任務(wù)的需求,能夠快速對接 缺點(diǎn): (1)客戶端斷開后重連會導(dǎo)致所有事件丟失 (2)高并發(fā)場景下,存在大量的失效key場景會導(dǎo)出失效時間存在延遲 (3)若有多個監(jiān)聽器監(jiān)聽該key,是會重復(fù)消費(fèi)這個過期事件的,需要特定邏輯判斷
① 修改Redis配置文件并重啟Redis
notify-keyspace-events Ex
注意: redis配置文件不能有空格,否則會啟動報錯
②Java中關(guān)于Redis的配置類
redisTemplate實(shí)例bean需要自定義生成; RedisMessageListenerContainer 是redis-key過期監(jiān)聽需要的監(jiān)聽器容器;
@Configuration @Slf4j public class RedisConfiguration { /** * Redis配置 * @param factory * @return */ @Bean(name = "redisTemplate") public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); RedisSerializer<String> redisSerializer = new StringRedisSerializer(); template.setConnectionFactory(factory); //key序列化方式 template.setKeySerializer(redisSerializer); //value序列化 template.setValueSerializer(redisSerializer); //value hashmap序列化 template.setHashValueSerializer(redisSerializer); //key hashmap序列化 template.setHashKeySerializer(redisSerializer); return template; } /** * 消息監(jiān)聽器容器bean * @param connectionFactory * @return */ @Bean public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
③監(jiān)聽器代碼
@Slf4j @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { private static final String TEST_REDIS_KEY = "testExpired"; public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer, RedisTemplate redisTemplate) { super(listenerContainer); /** * 設(shè)置一個Redis延遲過期key(key名:testExpired,過期時間:30秒) */ redisTemplate.opsForValue().set(TEST_REDIS_KEY, "1", 20, TimeUnit.SECONDS); log.info("設(shè)置redis-key"); } @Override public void onMessage(Message message, byte[] pattern) { try { String expiredKey = message.toString(); if (TEST_REDIS_KEY.equals(expiredKey)) { //業(yè)務(wù)處理 log.info(expiredKey + "過期,觸發(fā)回調(diào)"); } } catch (Exception e) { log.error("key 過期通知處理異常,{}", e); } } }
測試結(jié)果:
五、時間輪
推薦指數(shù):★★★★ 優(yōu)點(diǎn): (1)對于大量定時任務(wù),時間輪可以僅用一個工作線程對編排的任務(wù)進(jìn)行順序運(yùn)行; (2)自動運(yùn)行,可以自定義時間輪每輪的tick數(shù),tick間隔,靈活且時間精度可控 缺點(diǎn): (1)基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會導(dǎo)致任務(wù)丟失 (2)基于內(nèi)存存放任務(wù),不支持集群
public class WheelTimerTest { public static void main(String[] args) { //設(shè)置每個格子是 100ms, 總共 256 個格子 HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256); //加入三個任務(wù),依次設(shè)置超時時間是 10s 5s 20s System.out.println("加入一個任務(wù),ID = 1, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println(Thread.currentThread().getName()); System.out.println("執(zhí)行一個任務(wù),ID = 1, time= " + LocalDateTime.now()); }, 10, TimeUnit.SECONDS); System.out.println("加入一個任務(wù),ID = 2, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println(Thread.currentThread().getName()); System.out.println("執(zhí)行一個任務(wù),ID = 2, time= " + LocalDateTime.now()); }, 5, TimeUnit.SECONDS); System.out.println("加入一個任務(wù),ID = 3, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println(Thread.currentThread().getName()); System.out.println("執(zhí)行一個任務(wù),ID = 3, time= " + LocalDateTime.now()); }, 20, TimeUnit.SECONDS); System.out.println("加入一個任務(wù),ID = 4, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println(Thread.currentThread().getName()); System.out.println("執(zhí)行一個任務(wù),ID = 4, time= " + LocalDateTime.now()); }, 20, TimeUnit.SECONDS); System.out.println("等待任務(wù)執(zhí)行==========="); } }
六、消息隊(duì)列-延遲隊(duì)列
針對任務(wù)丟失的代價過大,高并發(fā)的場景 推薦指數(shù):★★★★ 優(yōu)點(diǎn): 支持集群,分布式,高并發(fā)場景; 缺點(diǎn): 引入額外的消息隊(duì)列,增加項(xiàng)目的部署和維護(hù)的復(fù)雜度。
場景:為一個委托指定期限,委托到期后,委托關(guān)系終止,相關(guān)業(yè)務(wù)權(quán)限移交回原擁有者 這里采用的是RabbitMq的死信隊(duì)列加TTL消息轉(zhuǎn)化為延遲隊(duì)列的方式(RabbitMq沒有延時隊(duì)列)
①聲明一個隊(duì)列設(shè)定其的死信隊(duì)列
@Configuration public class MqConfig { public static final String GLOBAL_RABBIT_TEMPLATE = "rabbitTemplateGlobal"; public static final String DLX_EXCHANGE_NAME = "dlxExchange"; public static final String AUTH_EXCHANGE_NAME = "authExchange"; public static final String DLX_QUEUE_NAME = "dlxQueue"; public static final String AUTH_QUEUE_NAME = "authQueue"; public static final String DLX_AUTH_QUEUE_NAME = "dlxAuthQueue"; @Bean @Qualifier(GLOBAL_RABBIT_TEMPLATE) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean @Qualifier(AUTH_EXCHANGE_NAME) public Exchange authExchange() { return ExchangeBuilder.directExchange (AUTH_EXCHANGE_NAME).durable (true).build (); } /** * 死信交換機(jī) * @return */ @Bean @Qualifier(DLX_EXCHANGE_NAME) public Exchange dlxExchange() { return ExchangeBuilder.directExchange (DLX_EXCHANGE_NAME).durable (true).build (); } /** * 記錄日志的死信隊(duì)列 * @return */ @Bean @Qualifier(DLX_QUEUE_NAME) public Queue dlxQueue() { // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) return QueueBuilder.durable (DLX_QUEUE_NAME).build (); } /** * 委托授權(quán)專用隊(duì)列 * @return */ @Bean @Qualifier(AUTH_QUEUE_NAME) public Queue authQueue() { return QueueBuilder .durable (AUTH_QUEUE_NAME) .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", "dlx_auth") .build (); } /** * 委托授權(quán)專用死信隊(duì)列 * @return */ @Bean @Qualifier(DLX_AUTH_QUEUE_NAME) public Queue dlxAuthQueue() { // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) return QueueBuilder .durable (DLX_AUTH_QUEUE_NAME) .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", "dlx_key") .build (); } @Bean public Binding bindDlxQueueExchange(@Qualifier(DLX_QUEUE_NAME) Queue dlxQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){ return BindingBuilder.bind (dlxQueue).to (dlxExchange).with ("dlx_key").noargs (); } /** * 委托授權(quán)專用死信隊(duì)列綁定關(guān)系 * @param dlxAuthQueue * @param dlxExchange * @return */ @Bean public Binding bindDlxAuthQueueExchange(@Qualifier(DLX_AUTH_QUEUE_NAME) Queue dlxAuthQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){ return BindingBuilder.bind (dlxAuthQueue).to (dlxExchange).with ("dlx_auth").noargs (); } /** * 委托授權(quán)專用隊(duì)列綁定關(guān)系 * @param authQueue * @param authExchange * @return */ @Bean public Binding bindAuthQueueExchange(@Qualifier(AUTH_QUEUE_NAME) Queue authQueue, @Qualifier(AUTH_EXCHANGE_NAME) Exchange authExchange){ return BindingBuilder.bind (authQueue).to (authExchange).with ("auth").noargs (); } }
②發(fā)送含過期時間的消息
向授權(quán)交換機(jī),發(fā)送路由為"auth"的消息(指定了業(yè)務(wù)所需的超時時間) =》發(fā)向MqConfig.AUTH_QUEUE_NAME 隊(duì)列
rabbitTemplate.convertAndSend(MqConfig.AUTH_EXCHANGE_NAME, "auth", "類型:END,信息:{id:1,fromUserId:111,toUserId:222,beginData:20201204,endData:20211104}", message -> { /** * MessagePostProcessor:消息后置處理 * 為消息設(shè)置屬性,然后返回消息,相當(dāng)于包裝消息的類 */ //業(yè)務(wù)邏輯:過期時間=xxxx String ttl = "5000"; //設(shè)置消息的過期時間 message.getMessageProperties ().setExpiration (ttl); return message; });
③超時后隊(duì)列MqConfig.AUTH_QUEUE_NAME會將消息轉(zhuǎn)發(fā)至其配置的死信路由"dlx_auth",監(jiān)聽該死信隊(duì)列即可消費(fèi)定時的消息
/** * 授權(quán)定時處理 * @param channel * @param message */ @RabbitListener(queues = MqConfig.DLX_AUTH_QUEUE_NAME) public void dlxAuthQ(Channel channel, Message message) throws IOException { System.out.println ("\n死信原因:" + message.getMessageProperties ().getHeaders ().get ("x-first-death-reason")); //1.判斷消息類型:1.BEGIN 2.END try { //2.1 類型為授權(quán)到期(END) //2.1.1 修改報件辦理人 //2.1.2 修改授權(quán)狀態(tài)為0(失效) //2.2 類型為授權(quán)開啟(BEGIN) //2.2.1 修改授權(quán)狀態(tài)為1(開啟) System.out.println (new String(message.getBody (), Charset.forName ("utf8"))); channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false); System.out.println ("已處理,授權(quán)相關(guān)信息修改成功"); } catch (Exception e) { //拒簽消息 channel.basicNack (message.getMessageProperties ().getDeliveryTag (), false, false); System.out.println ("授權(quán)相關(guān)信息處理失敗, 進(jìn)入死信隊(duì)列記錄日志"); } }
以上就是盤點(diǎn)Java中延時任務(wù)的多種實(shí)現(xiàn)方式的詳細(xì)內(nèi)容,更多關(guān)于Java延時任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java transient關(guān)鍵字與序列化操作實(shí)例詳解
這篇文章主要介紹了Java transient關(guān)鍵字與序列化操作,結(jié)合實(shí)例形式詳細(xì)分析了java序列化操作相關(guān)實(shí)現(xiàn)方法與操作注意事項(xiàng),需要的朋友可以參考下2019-09-09使用java swing實(shí)現(xiàn)qq登錄界面示例分享
這篇文章主要介紹了使用java swing實(shí)現(xiàn)qq登錄界面示例,需要的朋友可以參考下2014-04-04windows命令行中java和javac、javap使用詳解(java編譯命令)
最近重新復(fù)習(xí)了一下java基礎(chǔ),這里便講講對于一個類文件如何編譯、運(yùn)行、反編譯的。也讓自己加深一下印象2014-03-03關(guān)于Java實(shí)體類Serializable序列化接口的作用和必要性解析
序列化是將對象狀態(tài)轉(zhuǎn)化為可保持或者傳輸?shù)母袷竭^程,與序列化相反的是反序列化,完成序列化和反序列化,可以存儲或傳輸數(shù)據(jù),一般情況下,在定義實(shí)體類時會使用Serializable,需要的朋友可以參考下2023-05-05mybatis-config.xml文件中的mappers標(biāo)簽使用
在MyBatis配置中,<mapper>標(biāo)簽關(guān)鍵用于指定SQL?Mapper的XML文件路徑,主要有三種指定方式:resource、url和class,Resource方式從類的根路徑開始,適合放在項(xiàng)目內(nèi)部保障移植性,URL方式指定絕對路徑,移植性差,適用于外部路徑2024-10-10springboot讀取自定義配置文件時出現(xiàn)亂碼解決方案
這篇文章主要介紹了springboot讀取自定義配置文件時出現(xiàn)亂碼解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11