redis延時(shí)隊(duì)列的項(xiàng)目實(shí)踐
引入
<redisson.version>3.15.5</redisson.version> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>${redisson.version}</version> </dependency>
放入延時(shí)隊(duì)列
import org.redisson.api.RedissonClient; @Autowired private RedissonClient redissonClient; public static final String CardKitMessageDelayQueue = "QUEUE:CARD_KIT"; // 發(fā)送延時(shí)消息 RBlockingDeque<CardKitRedisBo> blockingDeque = redissonClient .getBlockingDeque(CardKitMessageDelayQueue); RDelayedQueue<CardKitRedisBo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); // 計(jì)算時(shí)間戳 long delayInSeconds = calculateDifference(model.getSendTime(), LocalDateTime.now()); CardKitRedisBo cardKitRedisBo = new CardKitRedisBo(); cardKitRedisBo.setId(model.getId()).setTemplateId(model.getTemplateId()); delayedQueue.offer(cardKitRedisBo, delayInSeconds, TimeUnit.SECONDS);
監(jiān)聽(tīng)延時(shí)隊(duì)列
import cn.hutool.json.JSONUtil; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingDeque; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.cloud.sleuth.Span; import org.springframework.cloud.sleuth.Tracer; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Slf4j @Component public class CardKitMessageListener implements ApplicationRunner { public static final String CardKitMessageDelayQueue = "QUEUE:CARD_KIT"; public static final String CardKitMessageDelayLock = "LOCK:CARD_KIT"; @Resource private RedissonClient redissonClient; @Autowired private Tracer tracer; @Autowired private CardKitService cardKitService; @Override public void run(ApplicationArguments args) { new Thread(() -> { RBlockingDeque<CardKitRedisBo> blockingDeque = redissonClient.getBlockingDeque(CardKitMessageDelayQueue); while (true) { // 獲取定時(shí)任務(wù)鎖 RLock rLock = redissonClient.getLock(CardKitMessageDelayLock); try { // 最多等待5秒 boolean isLocked = rLock.tryLock(5, TimeUnit.SECONDS); if (isLocked) { Span span = tracer.nextSpan().name("OccupationMessage").start(); try (Tracer.SpanInScope ws = tracer.withSpan(span)) { CardKitRedisBo poll = blockingDeque.take(); log.info("獲取延時(shí)消息:{}", JSONUtil.toJsonStr(poll)); // 消費(fèi)消息 cardKitService.sendCardKit(poll); } finally { try { rLock.unlock(); } catch (Exception ex) { log.warn("鎖釋放失?。? + ex.getMessage()); } try { span.end(); } catch (Exception ex) { log.error("失敗", ex) } } } } catch (Exception ex) { log.error("延遲消息處理異常:" + ex.getMessage(), ex); } } }).start(); } }
到此這篇關(guān)于redis延時(shí)隊(duì)列的文章就介紹到這了,更多相關(guān)redis延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
redis數(shù)據(jù)結(jié)構(gòu)之壓縮列表
這篇文章主要介紹了redis數(shù)據(jù)結(jié)構(gòu)之壓縮列表,壓縮列表是列表list和hash數(shù)據(jù)結(jié)構(gòu)的底層實(shí)現(xiàn)之一,是redis為了節(jié)約內(nèi)存而開(kāi)發(fā)的,由一系列特殊編碼的連續(xù)內(nèi)存塊組成的順序型數(shù)據(jù)結(jié)構(gòu),下面詳細(xì)內(nèi)容需要的小伙伴可以參考一下2022-03-03CentOS Linux系統(tǒng)下安裝Redis過(guò)程和配置參數(shù)說(shuō)明
這篇文章主要介紹了CentOS Linux系統(tǒng)下安裝Redis過(guò)程和配置參數(shù)說(shuō)明,需要的朋友可以參考下2014-10-10使用Redis實(shí)現(xiàn)微信步數(shù)排行榜功能
這篇文章主要介紹了使用Redis實(shí)現(xiàn)微信步數(shù)排行榜功能,本文通過(guò)圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06redis實(shí)現(xiàn)存儲(chǔ)帖子的點(diǎn)贊狀態(tài)和數(shù)量的示例代碼
使用Redis來(lái)實(shí)現(xiàn)點(diǎn)贊功能是一種高效的選擇,因?yàn)镽edis是一個(gè)內(nèi)存數(shù)據(jù)庫(kù),適用于處理高并發(fā)的數(shù)據(jù)操作,這篇文章主要介紹了redis實(shí)現(xiàn)存儲(chǔ)帖子的點(diǎn)贊狀態(tài)和數(shù)量的示例代碼,需要的朋友可以參考下2023-09-09Redis+Caffeine兩級(jí)緩存的實(shí)現(xiàn)
本文主要介紹了Redis+Caffeine兩級(jí)緩存的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06