SpringBoot集成消息隊列的項目實踐
背景
最近在對公司開發(fā)框架進行優(yōu)化,框架內(nèi)涉及到多處入庫的日志記錄,例如登錄日志/操作日志/訪問日志/業(yè)務(wù)執(zhí)行日志,集成在業(yè)務(wù)代碼中耦合度較高且占用業(yè)務(wù)操作執(zhí)行時間,所以準備集成相關(guān)消息隊列進行代碼解耦
方案規(guī)劃
現(xiàn)有的成熟消息隊列組件非常多,例如RabbitMQ,ActiveMQ,Kafka等,考慮到業(yè)務(wù)并發(fā)量不高且框架已經(jīng)應(yīng)用于多個項目平穩(wěn)運行,準備提供基于Redis的消息隊列和集成ActiveMQ兩種方案,Redis消息隊列的好處是無需額外安裝部署存量項目可平穩(wěn)過度但消息無法持久化可能丟失,ActiveMQ解決方案成熟可以保證消息持久化但是需要實施人員額外掌握操作部署
統(tǒng)一設(shè)計
增加自定義配置指定消息隊列方式
system: #消息隊列方式 redis/activemq messageChannel: redis
定義消息傳輸統(tǒng)一模型
public class MessageModel { private Class<? extends IMessageReceiver> handleClazz; private String bodyContent; private Class bodyClass; private HashMap extraParam; public MessageModel(){ extraParam = new HashMap(); } public Class<? extends IMessageReceiver> getHandleClazz() { return handleClazz; } public void setHandleClazz(Class<? extends IMessageReceiver> handleClazz) { this.handleClazz = handleClazz; } public HashMap getExtraParam() { return extraParam; } public void setExtraParam(HashMap extraParam) { this.extraParam = extraParam; } public String getBodyContent() { return bodyContent; } public void setBodyContent(String bodyContent) { this.bodyContent = bodyContent; } public Class getBodyClass() { return bodyClass; } public void setBodyClass(Class bodyClass) { this.bodyClass = bodyClass; } }
定義標準消息處理接口
public interface IMessageReceiver { void handleMessage(Object bodyObject, HashMap extraParam); }
定義統(tǒng)一對外發(fā)送消息工具類
@Component public class MessageUtil { @Autowired private SystemConfig systemConfig; @Autowired private RedisUtil redisUtil; @Autowired private JmsMessagingTemplate jmsMessagingTemplate; public void sendMessage(Object messageBody, Class<? extends IMessageReceiver> handleClass, HashMap<String,Object> extraParam) { MessageModel messageModel = new MessageModel(); messageModel.setHandleClazz(handleClass); messageModel.setBodyClass(messageBody.getClass()); messageModel.setBodyContent(JSON.toJSONString(messageBody)); if (extraParam != null) { for (String key:extraParam.keySet()) { messageModel.getExtraParam().put(key,extraParam.get(key)); } } if(systemConfig.getMessageChannel().equals("redis")){ redisUtil.sendMessage("message", JSON.toJSON(messageModel)); }else{ jmsMessagingTemplate.convertAndSend("message",JSON.toJSONString(messageModel)); } } }
集成Redis消息隊列
pom配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.0.1.RELEASE</version> </dependency>
連接配置
spring: redis: host: localhost port: 6379 password:
操作工具類
@Autowired private RedisTemplate redisTemplate; public void sendMessage(String channel, Object message) { redisTemplate.convertAndSend(channel, message); }
消息處理
@Component @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) public class RedisMessageReceiver { public void receiveMessage(String message) { MessageModel messageModel = JSON.parseObject(message, MessageModel.class); IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz()); receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam()); } }
配置注冊
@Configuration public class MessageCenter { @Bean @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多個 messageListener,配置不同的交換機 container.addMessageListener(listenerAdapter, new PatternTopic("message")); return container; } /** * 消息監(jiān)聽器適配器,綁定消息處理器,利用反射技術(shù)調(diào)用消息處理器的業(yè)務(wù)方法 * * @param receiver * @return */ @Bean @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) MessageListenerAdapter listenerAdapter(RedisMessageReceiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } }
集成ActiveMQ消息隊列
pom配置
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency>
注意:jdk1.8對應(yīng)版本5.15.0
連接配置
spring: activemq: broker-url: tcp://127.0.0.1:61616 #MQ服務(wù)器地址 user: admin password: admin pool: enabled: true
消息處理
@Component @ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false) public class ActiveMQMessageReceiver { @JmsListener(destination = "message", containerFactory = "customQueueListener") public void handleMessage(String message) { MessageModel messageModel = JSON.parseObject(message, MessageModel.class); IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz()); receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam()); } }
配置注冊
@Configuration @EnableJms public class MessageCenter { @Bean(name = "customQueueListener") @ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false) public JmsListenerContainerFactory<?> customQueueListener(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(false); factory.setConnectionFactory(connectionFactory); //重連間隔時間 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); //連接數(shù) factory.setConcurrency("5-10"); //指定任務(wù)線程池 factory.setTaskExecutor(new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy())); return factory; } }
使用示例
消息處理
@Service public class RequestLogMessageReceiver implements IMessageReceiver{ @Autowired private F_RequestLogService requestLogService; @Override public void handleMessage(Object bodyObject, HashMap extraParam) { F_RequestLogDO requestLogDO = (F_RequestLogDO)bodyObject; requestLogService.insert(requestLogDO); } }
發(fā)送消息
@AutoWired private MessageUtil messageUtil; messageUtil.sendMessage(requestLogDO,RequestLogMessageReceiver.class,null);
到此這篇關(guān)于SpringBoot集成消息隊列的項目實踐的文章就介紹到這了,更多相關(guān)SpringBoot集成消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java安全?ysoserial?CommonsCollections1示例解析
這篇文章主要介紹了java安全?ysoserial?CommonsCollections1示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10shiro與spring集成基礎(chǔ)Hello案例詳解
這篇文章主要介紹了shiro與spring集成基礎(chǔ)Hello案例詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-11-11