分布式利器redis及redisson的延遲隊(duì)列實(shí)踐
前言碎語(yǔ)
首先說明下需求,一個(gè)用戶中心產(chǎn)品,用戶在試用產(chǎn)品有三天的期限,三天到期后準(zhǔn)時(shí)準(zhǔn)點(diǎn)通知用戶,試用產(chǎn)品到期了。這個(gè)需求如果不是準(zhǔn)時(shí)通知,而是每天定點(diǎn)通知就簡(jiǎn)單了。如果需要準(zhǔn)時(shí)通知就只能上延遲隊(duì)列了。使用場(chǎng)景除了如上,典型的業(yè)務(wù)場(chǎng)景還有電商中的延時(shí)未支付訂單失效等等。
延遲隊(duì)列多種實(shí)現(xiàn)方式
- 1.如基于RabbitMQ的隊(duì)列ttl+死信路由策略:通過設(shè)置一個(gè)隊(duì)列的超時(shí)未消費(fèi)時(shí)間,配合死信路由策略,到達(dá)時(shí)間未消費(fèi)后,回會(huì)將此消息路由到指定隊(duì)列
- 2.基于RabbitMQ延遲隊(duì)列插件(rabbitmq-delayed-message-exchange):發(fā)送消息時(shí)通過在請(qǐng)求頭添加延時(shí)參數(shù)(headers.put("x-delay", 5000))即可達(dá)到延遲隊(duì)列的效果
- 3.使用redis的zset有序性,輪詢zset中的每個(gè)元素,到點(diǎn)后將內(nèi)容遷移至待消費(fèi)的隊(duì)列,(redisson已有實(shí)現(xiàn))
- 4.使用redis的key的過期通知策略,設(shè)置一個(gè)key的過期時(shí)間為延遲時(shí)間,過期后通知客戶端
redisson中的延遲隊(duì)列實(shí)現(xiàn)
怎么封裝便于業(yè)務(wù)使用。
1.首先定義一個(gè)延遲job,里面包含一個(gè)map參數(shù),和隊(duì)列執(zhí)行器的具體實(shí)現(xiàn)class,觸發(fā)任務(wù)執(zhí)行時(shí),map參數(shù)會(huì)被傳遞到具體的業(yè)務(wù)執(zhí)行器實(shí)現(xiàn)內(nèi)
/** * Created by kl on 2018/7/20. * Content :延時(shí)job */ public class DelayJob { private Map jobParams;//job執(zhí)行參數(shù) private Class aClass;//具體執(zhí)行實(shí)例實(shí)現(xiàn) }
2.定義一個(gè)延遲job執(zhí)行器接口,業(yè)務(wù)需要實(shí)現(xiàn)這個(gè)接口,然后在execute方法內(nèi)寫自己的業(yè)務(wù)邏輯
/** * Created by kl on 2018/7/20. * Content :延時(shí)job執(zhí)行器接口 */ public interface ExecuteJob { void execute(DelayJob job); }
3.消費(fèi)已經(jīng)到點(diǎn)的延時(shí)job服務(wù),通過job參數(shù)調(diào)用業(yè)務(wù)執(zhí)行器實(shí)現(xiàn)
@Component public class JobTimer { static final String jobsTag = "customer_jobtimer_jobs"; @Autowired private RedissonClient client; @Autowired private ApplicationContext context; ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); @PostConstruct public void startJobTimer() { RBlockingQueueblockingQueue = client.getBlockingQueue(jobsTag); new Thread() { @Override public void run() { while (true) { try { DelayJob job = blockingQueue.take(); executorService.execute(new ExecutorTask(context, job)); } catch (Exception e) { e.printStackTrace(); try { TimeUnit.SECONDS.sleep(60); } catch (Exception ex) { } } } } }.start(); } class ExecutorTask implements Runnable { private ApplicationContext context; private DelayJob delayJob; public ExecutorTask(ApplicationContext context, DelayJob delayJob) { this.context = context; this.delayJob = delayJob; } @Override public void run() { ExecuteJob service = (ExecuteJob) context.getBean(delayJob.getaClass()); service.execute(delayJob); } } }
4.封裝延時(shí)job服務(wù)
/** * Created by kl on 2018/7/20. * Content :延時(shí)job服務(wù) */ @Component public class DelayJobService { @Autowired private RedissonClient client; public void submitJob(DelayJob job, Long delay, TimeUnit timeUnit){ RBlockingQueueblockingQueue = client.getBlockingQueue(JobTimer.jobsTag); RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue); delayedQueue.offer(job,delay,timeUnit); } }
文末結(jié)語(yǔ)
redisson作為一個(gè)分布式利器,這么好用的工具沒人用有點(diǎn)可惜,還有一個(gè)原因是有個(gè)想法,想將延遲隊(duì)列這個(gè)功能封裝成一個(gè)spring boot的start依賴,然后開源出來,造福四方,希望大家以后多多支持腳本之家!
相關(guān)文章
從MySQL到Redis的簡(jiǎn)單數(shù)據(jù)庫(kù)遷移方法
這篇文章主要介紹了從MySQL到Redis的簡(jiǎn)單數(shù)據(jù)庫(kù)遷移方法,注意Redis數(shù)據(jù)庫(kù)基于內(nèi)存,并不能代替?zhèn)鹘y(tǒng)數(shù)據(jù)庫(kù),需要的朋友可以參考下2015-06-06Redis筆記點(diǎn)贊排行榜的實(shí)現(xiàn)示例
探店筆記類似點(diǎn)評(píng)網(wǎng)站的評(píng)價(jià),本文主要介紹了Redis筆記點(diǎn)贊排行榜的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01使用 Redis 流實(shí)現(xiàn)消息隊(duì)列的代碼
這篇文章主要介紹了使用 Redis 流實(shí)現(xiàn)消息隊(duì)列,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-11-11Redis整合SpringBoot的RedisTemplate實(shí)現(xiàn)類(實(shí)例詳解)
這篇文章主要介紹了Redis整合SpringBoot的RedisTemplate實(shí)現(xiàn)類,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01嵌入式Redis服務(wù)器在Spring Boot測(cè)試中的使用教程
這篇文章主要介紹了嵌入式Redis服務(wù)器在Spring Boot測(cè)試中的使用,本文通過實(shí)例代碼場(chǎng)景分析給大家介紹的非常詳細(xì),需要的朋友參考下吧2021-07-07