redis延時(shí)隊(duì)列的項(xiàng)目實(shí)踐
引入
<redisson.version>3.15.5</redisson.version>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${redisson.version}</version>
</dependency>
放入延時(shí)隊(duì)列
import org.redisson.api.RedissonClient;
@Autowired
private RedissonClient redissonClient;
public static final String CardKitMessageDelayQueue = "QUEUE:CARD_KIT";
// 發(fā)送延時(shí)消息
RBlockingDeque<CardKitRedisBo> blockingDeque = redissonClient
.getBlockingDeque(CardKitMessageDelayQueue);
RDelayedQueue<CardKitRedisBo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
// 計(jì)算時(shí)間戳
long delayInSeconds = calculateDifference(model.getSendTime(), LocalDateTime.now());
CardKitRedisBo cardKitRedisBo = new CardKitRedisBo();
cardKitRedisBo.setId(model.getId()).setTemplateId(model.getTemplateId());
delayedQueue.offer(cardKitRedisBo, delayInSeconds, TimeUnit.SECONDS);
監(jiān)聽延時(shí)隊(duì)列
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class CardKitMessageListener implements ApplicationRunner {
public static final String CardKitMessageDelayQueue = "QUEUE:CARD_KIT";
public static final String CardKitMessageDelayLock = "LOCK:CARD_KIT";
@Resource
private RedissonClient redissonClient;
@Autowired
private Tracer tracer;
@Autowired
private CardKitService cardKitService;
@Override
public void run(ApplicationArguments args) {
new Thread(() -> {
RBlockingDeque<CardKitRedisBo> blockingDeque = redissonClient.getBlockingDeque(CardKitMessageDelayQueue);
while (true) {
// 獲取定時(shí)任務(wù)鎖
RLock rLock = redissonClient.getLock(CardKitMessageDelayLock);
try {
// 最多等待5秒
boolean isLocked = rLock.tryLock(5, TimeUnit.SECONDS);
if (isLocked) {
Span span = tracer.nextSpan().name("OccupationMessage").start();
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
CardKitRedisBo poll = blockingDeque.take();
log.info("獲取延時(shí)消息:{}", JSONUtil.toJsonStr(poll));
// 消費(fèi)消息
cardKitService.sendCardKit(poll);
} finally {
try {
rLock.unlock();
} catch (Exception ex) {
log.warn("鎖釋放失敗:" + ex.getMessage());
}
try {
span.end();
} catch (Exception ex) {
log.error("失敗", ex)
}
}
}
} catch (Exception ex) {
log.error("延遲消息處理異常:" + ex.getMessage(), ex);
}
}
}).start();
}
}到此這篇關(guān)于redis延時(shí)隊(duì)列的文章就介紹到這了,更多相關(guān)redis延時(shí)隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
redis數(shù)據(jù)結(jié)構(gòu)之壓縮列表
這篇文章主要介紹了redis數(shù)據(jù)結(jié)構(gòu)之壓縮列表,壓縮列表是列表list和hash數(shù)據(jù)結(jié)構(gòu)的底層實(shí)現(xiàn)之一,是redis為了節(jié)約內(nèi)存而開發(fā)的,由一系列特殊編碼的連續(xù)內(nèi)存塊組成的順序型數(shù)據(jù)結(jié)構(gòu),下面詳細(xì)內(nèi)容需要的小伙伴可以參考一下2022-03-03
CentOS Linux系統(tǒng)下安裝Redis過程和配置參數(shù)說明
這篇文章主要介紹了CentOS Linux系統(tǒng)下安裝Redis過程和配置參數(shù)說明,需要的朋友可以參考下2014-10-10
使用Redis實(shí)現(xiàn)微信步數(shù)排行榜功能
這篇文章主要介紹了使用Redis實(shí)現(xiàn)微信步數(shù)排行榜功能,本文通過圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06
redis實(shí)現(xiàn)存儲帖子的點(diǎn)贊狀態(tài)和數(shù)量的示例代碼
使用Redis來實(shí)現(xiàn)點(diǎn)贊功能是一種高效的選擇,因?yàn)镽edis是一個(gè)內(nèi)存數(shù)據(jù)庫,適用于處理高并發(fā)的數(shù)據(jù)操作,這篇文章主要介紹了redis實(shí)現(xiàn)存儲帖子的點(diǎn)贊狀態(tài)和數(shù)量的示例代碼,需要的朋友可以參考下2023-09-09
Redis+Caffeine兩級緩存的實(shí)現(xiàn)
本文主要介紹了Redis+Caffeine兩級緩存的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06

