詳解SpringBoot集成消息隊列的案例應用
背景
最近在對公司開發(fā)框架進行優(yōu)化,框架內(nèi)涉及到多處入庫的日志記錄,例如登錄日志/操作日志/訪問日志/業(yè)務執(zhí)行日志,集成在業(yè)務代碼中耦合度較高且占用業(yè)務操作執(zhí)行時間,所以準備集成相關消息隊列進行代碼解耦
方案規(guī)劃
現(xiàn)有的成熟消息隊列組件非常多,例如RabbitMQ,ActiveMQ,Kafka等,考慮到業(yè)務并發(fā)量不高且框架已經(jīng)應用于多個項目平穩(wěn)運行,準備提供基于Redis的消息隊列和集成ActiveMQ兩種方案,Redis消息隊列的好處是無需額外安裝部署存量項目可平穩(wěn)過度但消息無法持久化可能丟失,ActiveMQ解決方案成熟可以保證消息持久化但是需要實施人員額外掌握操作部署
統(tǒng)一設計
增加自定義配置指定消息隊列方式
system: #消息隊列方式 redis/activemq messageChannel: redis
定義消息傳輸統(tǒng)一模型
public class MessageModel { private Class<? extends IMessageReceiver> handleClazz; private String bodyContent; private Class bodyClass; private HashMap extraParam; public MessageModel(){ extraParam = new HashMap(); } public Class<? extends IMessageReceiver> getHandleClazz() { return handleClazz; } public void setHandleClazz(Class<? extends IMessageReceiver> handleClazz) { this.handleClazz = handleClazz; } public HashMap getExtraParam() { return extraParam; } public void setExtraParam(HashMap extraParam) { this.extraParam = extraParam; } public String getBodyContent() { return bodyContent; } public void setBodyContent(String bodyContent) { this.bodyContent = bodyContent; } public Class getBodyClass() { return bodyClass; } public void setBodyClass(Class bodyClass) { this.bodyClass = bodyClass; } }
定義標準消息處理接口
public interface IMessageReceiver { void handleMessage(Object bodyObject, HashMap extraParam); }
定義統(tǒng)一對外發(fā)送消息工具類
@Component public class MessageUtil { @Autowired private SystemConfig systemConfig; @Autowired private RedisUtil redisUtil; @Autowired private JmsMessagingTemplate jmsMessagingTemplate; public void sendMessage(Object messageBody, Class<? extends IMessageReceiver> handleClass, HashMap<String,Object> extraParam) { MessageModel messageModel = new MessageModel(); messageModel.setHandleClazz(handleClass); messageModel.setBodyClass(messageBody.getClass()); messageModel.setBodyContent(JSON.toJSONString(messageBody)); if (extraParam != null) { for (String key:extraParam.keySet()) { messageModel.getExtraParam().put(key,extraParam.get(key)); } } if(systemConfig.getMessageChannel().equals("redis")){ redisUtil.sendMessage("message", JSON.toJSON(messageModel)); }else{ jmsMessagingTemplate.convertAndSend("message",JSON.toJSONString(messageModel)); } } }
集成Redis消息隊列
pom配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.0.1.RELEASE</version> </dependency>
連接配置
spring:
redis:
host: localhost
port: 6379
password:
操作工具類
@Autowired private RedisTemplate redisTemplate; public void sendMessage(String channel, Object message) { redisTemplate.convertAndSend(channel, message); }
消息處理
@Component @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) public class RedisMessageReceiver { public void receiveMessage(String message) { MessageModel messageModel = JSON.parseObject(message, MessageModel.class); IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz()); receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam()); } }
配置注冊
@Configuration public class MessageCenter { @Bean @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多個 messageListener,配置不同的交換機 container.addMessageListener(listenerAdapter, new PatternTopic("message")); return container; } /** * 消息監(jiān)聽器適配器,綁定消息處理器,利用反射技術調(diào)用消息處理器的業(yè)務方法 * * @param receiver * @return */ @Bean @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) MessageListenerAdapter listenerAdapter(RedisMessageReceiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } }
集成ActiveMQ消息隊列
pom配置
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency>
注意:jdk1.8對應版本5.15.0
連接配置
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 #MQ服務器地址
user: admin
password: admin
pool:
enabled: true
消息處理
@Component @ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false) public class ActiveMQMessageReceiver { @JmsListener(destination = "message", containerFactory = "customQueueListener") public void handleMessage(String message) { MessageModel messageModel = JSON.parseObject(message, MessageModel.class); IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz()); receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam()); } }
配置注冊
@Configuration @EnableJms public class MessageCenter { @Bean(name = "customQueueListener") @ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false) public JmsListenerContainerFactory<?> customQueueListener(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(false); factory.setConnectionFactory(connectionFactory); //重連間隔時間 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); //連接數(shù) factory.setConcurrency("5-10"); //指定任務線程池 factory.setTaskExecutor(new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy())); return factory; } }
使用示例
消息處理
@Service public class RequestLogMessageReceiver implements IMessageReceiver{ @Autowired private F_RequestLogService requestLogService; @Override public void handleMessage(Object bodyObject, HashMap extraParam) { F_RequestLogDO requestLogDO = (F_RequestLogDO)bodyObject; requestLogService.insert(requestLogDO); } }
發(fā)送消息
@AutoWired private MessageUtil messageUtil; messageUtil.sendMessage(requestLogDO,RequestLogMessageReceiver.class,null);
到此這篇關于詳解SpringBoot集成消息隊列的案例應用的文章就介紹到這了,更多相關SpringBoot消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot HandlerIntercepter攔截器修改request body數(shù)據(jù)的操作
這篇文章主要介紹了springboot HandlerIntercepter攔截器修改request body數(shù)據(jù)的操作,具有很好的參考價值,希望對大家有所幫助。2021-06-06如何使用mybatis-plus實現(xiàn)分頁查詢功能
最近在研究mybatis,然后就去找簡化mybatis開發(fā)的工具,發(fā)現(xiàn)就有通用Mapper和mybatis-plus兩個比較好的可是使用,可是經(jīng)過對比發(fā)現(xiàn)還是mybatis-plus比較好,下面這篇文章主要給大家介紹了關于如何使用mybatis-plus實現(xiàn)分頁查詢功能的相關資料,需要的朋友可以參考下2022-06-06Netty分布式pipeline管道Handler的添加代碼跟蹤解析
這篇文章主要介紹了Netty分布式pipeline管道Handler的添加代碼跟蹤解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03Java中double數(shù)值保留兩位小數(shù)的4種實現(xiàn)方式舉例
在Java編程中,我們經(jīng)常遇到需要對double類型的浮點數(shù)進行精確截斷或四舍五入保留兩位小數(shù)的需求,這篇文章主要給大家介紹了關于Java中double數(shù)值保留兩位小數(shù)的4種實現(xiàn)方式,需要的朋友可以參考下2024-07-07