SpringBoot集成Redisson實現(xiàn)消息隊列的示例代碼
更新時間:2024年10月10日 09:12:37 作者:入秋的大橘
本文介紹了如何在SpringBoot中通過集成Redisson來實現(xiàn)消息隊列的功能,包括RedisQueue、RedisQueueInit、RedisQueueListener、RedisQueueService等相關(guān)組件的實現(xiàn)和測試,感興趣的可以了解一下
包含組件內(nèi)容
- RedisQueue:消息隊列監(jiān)聽標(biāo)識
- RedisQueueInit:Redis隊列監(jiān)聽器
- RedisQueueListener:Redis消息隊列監(jiān)聽實現(xiàn)
- RedisQueueService:Redis消息隊列服務(wù)工具
代碼實現(xiàn)
RedisQueue
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * Redis消息隊列注解 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface RedisQueue { /** * 隊列名 */ String value(); }
RedisQueueInit
import jakarta.annotation.Resource; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.redisson.RedissonShutdownException; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * 初始化Redis隊列監(jiān)聽器 * * @author 十八 * @createTime 2024-09-09 22:49 */ @Slf4j @Component public class RedisQueueInit implements ApplicationContextAware { public static final String REDIS_QUEUE_PREFIX = "redis-queue"; final AtomicBoolean shutdownRequested = new AtomicBoolean(false); @Resource private RedissonClient redissonClient; private ExecutorService executorService; public static String buildQueueName(String queueName) { return REDIS_QUEUE_PREFIX + ":" + queueName; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class); if (!queueListeners.isEmpty()) { executorService = createThreadPool(); for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) { RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class); if (redisQueue != null) { String queueName = redisQueue.value(); executorService.submit(() -> listenQueue(queueName, entry.getValue())); } } } } private ExecutorService createThreadPool() { return new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new NamedThreadFactory(REDIS_QUEUE_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy() ); } private void listenQueue(String queueName, RedisQueueListener redisQueueListener) { queueName = buildQueueName(queueName); RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName); log.info("Redis隊列監(jiān)聽開啟: {}", queueName); while (!shutdownRequested.get() && !redissonClient.isShutdown()) { try { Object message = blockingQueue.take(); executorService.submit(() -> redisQueueListener.consume(message)); } catch (RedissonShutdownException e) { log.info("Redis連接關(guān)閉,停止監(jiān)聽隊列: {}", queueName); break; } catch (Exception e) { log.error("監(jiān)聽隊列異常: {}", queueName, e); } } } public void shutdown() { if (executorService != null) { executorService.shutdown(); try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException ex) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } shutdownRequested.set(true); if (redissonClient != null && !redissonClient.isShuttingDown()) { redissonClient.shutdown(); } } private static class NamedThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public NamedThreadFactory(String prefix) { this.namePrefix = prefix; } @Override public Thread newThread(@NotNull Runnable r) { return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement()); } } }
RedisQueueListener
/** * Redis消息隊列監(jiān)聽實現(xiàn) * * @author 十八 * @createTime 2024-09-09 22:51 */ public interface RedisQueueListener<T> { /** * 隊列消費方法 * * @param content 消息內(nèi)容 */ void consume(T content); }
RedisQueueService
import jakarta.annotation.Resource; import java.util.concurrent.TimeUnit; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; /** * Redis 消息隊列服務(wù) * * @author 十八 * @createTime 2024-09-09 22:52 */ @Component public class RedisQueueService { @Resource private RedissonClient redissonClient; /** * 添加隊列 * * @param queueName 隊列名稱 * @param content 消息 * @param <T> 泛型 */ public <T> void send(String queueName, T content) { RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName)); blockingQueue.add(content); } /** * 添加延遲隊列 * * @param queueName 隊列名稱 * @param content 消息類型 * @param delay 延遲時間 * @param timeUnit 單位 * @param <T> 泛型 */ public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName)); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(content, delay, timeUnit); } /** * 發(fā)送延遲隊列消息(單位毫秒) * * @param queueName 隊列名稱 * @param content 消息類型 * @param delay 延遲時間 * @param <T> 泛型 */ public <T> void sendDelay(String queueName, T content, long delay) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName)); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS); } }
測試
創(chuàng)建監(jiān)聽對象
import cn.yiyanc.infrastructure.redis.annotation.RedisQueue; import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * @author 十八 * @createTime 2024-09-10 00:09 */ @Slf4j @Component @RedisQueue("test") public class TestListener implements RedisQueueListener<String> { @Override public void invoke(String content) { log.info("隊列消息接收 >>> {}", content); } }
測試用例
import jakarta.annotation.Resource; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 十八 * @createTime 2024-09-10 00:11 */ @RestController @RequestMapping("queue") public class QueueController { @Resource private RedisQueueService redisQueueService; @PostMapping("send") public void send(String message) { redisQueueService.send("test", message); redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000); } }
測試結(jié)果
到此這篇關(guān)于SpringBoot集成Redisson實現(xiàn)消息隊列的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Redisson消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot?全局異常處理和統(tǒng)一響應(yīng)對象的處理方式
這篇文章主要介紹了springboot?全局異常處理和統(tǒng)一響應(yīng)對象,主要包括SpringBoot 默認(rèn)的異常處理機制和SpringBoot 全局異常處理,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06Spring 實現(xiàn)excel及pdf導(dǎo)出表格示例
本篇文章主要介紹了Spring 實現(xiàn)excel及pdf導(dǎo)出表格示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-03-03Java實現(xiàn)的properties文件動態(tài)修改并自動保存工具類
這篇文章主要介紹了Java實現(xiàn)的properties文件動態(tài)修改并自動保存工具類,可實現(xiàn)針對properties配置文件的相關(guān)修改與保存功能,需要的朋友可以參考下2017-11-11