詳解SpringBoot集成消息隊(duì)列的案例應(yīng)用
背景
最近在對(duì)公司開發(fā)框架進(jìn)行優(yōu)化,框架內(nèi)涉及到多處入庫(kù)的日志記錄,例如登錄日志/操作日志/訪問日志/業(yè)務(wù)執(zhí)行日志,集成在業(yè)務(wù)代碼中耦合度較高且占用業(yè)務(wù)操作執(zhí)行時(shí)間,所以準(zhǔn)備集成相關(guān)消息隊(duì)列進(jìn)行代碼解耦
方案規(guī)劃
現(xiàn)有的成熟消息隊(duì)列組件非常多,例如RabbitMQ,ActiveMQ,Kafka等,考慮到業(yè)務(wù)并發(fā)量不高且框架已經(jīng)應(yīng)用于多個(gè)項(xiàng)目平穩(wěn)運(yùn)行,準(zhǔn)備提供基于Redis的消息隊(duì)列和集成ActiveMQ兩種方案,Redis消息隊(duì)列的好處是無需額外安裝部署存量項(xiàng)目可平穩(wěn)過度但消息無法持久化可能丟失,ActiveMQ解決方案成熟可以保證消息持久化但是需要實(shí)施人員額外掌握操作部署
統(tǒng)一設(shè)計(jì)
增加自定義配置指定消息隊(duì)列方式
system: #消息隊(duì)列方式 redis/activemq messageChannel: redis
定義消息傳輸統(tǒng)一模型
public class MessageModel {
private Class<? extends IMessageReceiver> handleClazz;
private String bodyContent;
private Class bodyClass;
private HashMap extraParam;
public MessageModel(){
extraParam = new HashMap();
}
public Class<? extends IMessageReceiver> getHandleClazz() {
return handleClazz;
}
public void setHandleClazz(Class<? extends IMessageReceiver> handleClazz) {
this.handleClazz = handleClazz;
}
public HashMap getExtraParam() {
return extraParam;
}
public void setExtraParam(HashMap extraParam) {
this.extraParam = extraParam;
}
public String getBodyContent() {
return bodyContent;
}
public void setBodyContent(String bodyContent) {
this.bodyContent = bodyContent;
}
public Class getBodyClass() {
return bodyClass;
}
public void setBodyClass(Class bodyClass) {
this.bodyClass = bodyClass;
}
}定義標(biāo)準(zhǔn)消息處理接口
public interface IMessageReceiver {
void handleMessage(Object bodyObject, HashMap extraParam);
}
定義統(tǒng)一對(duì)外發(fā)送消息工具類
@Component
public class MessageUtil {
@Autowired
private SystemConfig systemConfig;
@Autowired
private RedisUtil redisUtil;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
public void sendMessage(Object messageBody, Class<? extends IMessageReceiver> handleClass, HashMap<String,Object> extraParam) {
MessageModel messageModel = new MessageModel();
messageModel.setHandleClazz(handleClass);
messageModel.setBodyClass(messageBody.getClass());
messageModel.setBodyContent(JSON.toJSONString(messageBody));
if (extraParam != null) {
for (String key:extraParam.keySet()) {
messageModel.getExtraParam().put(key,extraParam.get(key));
}
}
if(systemConfig.getMessageChannel().equals("redis")){
redisUtil.sendMessage("message", JSON.toJSON(messageModel));
}else{
jmsMessagingTemplate.convertAndSend("message",JSON.toJSONString(messageModel));
}
}
}
集成Redis消息隊(duì)列
pom配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
連接配置
spring:
redis:
host: localhost
port: 6379
password:
操作工具類
@Autowired
private RedisTemplate redisTemplate;
public void sendMessage(String channel, Object message) {
redisTemplate.convertAndSend(channel, message);
}
消息處理
@Component
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true)
public class RedisMessageReceiver {
public void receiveMessage(String message) {
MessageModel messageModel = JSON.parseObject(message, MessageModel.class);
IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz());
receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam());
}
}配置注冊(cè)
@Configuration
public class MessageCenter {
@Bean
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true)
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多個(gè) messageListener,配置不同的交換機(jī)
container.addMessageListener(listenerAdapter, new PatternTopic("message"));
return container;
}
/**
* 消息監(jiān)聽器適配器,綁定消息處理器,利用反射技術(shù)調(diào)用消息處理器的業(yè)務(wù)方法
*
* @param receiver
* @return
*/
@Bean
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true)
MessageListenerAdapter listenerAdapter(RedisMessageReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}集成ActiveMQ消息隊(duì)列
pom配置
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
注意:jdk1.8對(duì)應(yīng)版本5.15.0
連接配置
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 #MQ服務(wù)器地址
user: admin
password: admin
pool:
enabled: true
消息處理
@Component
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false)
public class ActiveMQMessageReceiver {
@JmsListener(destination = "message", containerFactory = "customQueueListener")
public void handleMessage(String message) {
MessageModel messageModel = JSON.parseObject(message, MessageModel.class);
IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz());
receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam());
}
}
配置注冊(cè)
@Configuration
@EnableJms
public class MessageCenter {
@Bean(name = "customQueueListener")
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false)
public JmsListenerContainerFactory<?> customQueueListener(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
//重連間隔時(shí)間
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
//連接數(shù)
factory.setConcurrency("5-10");
//指定任務(wù)線程池
factory.setTaskExecutor(new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()));
return factory;
}
}
使用示例
消息處理
@Service
public class RequestLogMessageReceiver implements IMessageReceiver{
@Autowired
private F_RequestLogService requestLogService;
@Override
public void handleMessage(Object bodyObject, HashMap extraParam) {
F_RequestLogDO requestLogDO = (F_RequestLogDO)bodyObject;
requestLogService.insert(requestLogDO);
}
}
發(fā)送消息
@AutoWired private MessageUtil messageUtil; messageUtil.sendMessage(requestLogDO,RequestLogMessageReceiver.class,null);
到此這篇關(guān)于詳解SpringBoot集成消息隊(duì)列的案例應(yīng)用的文章就介紹到這了,更多相關(guān)SpringBoot消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ消息隊(duì)列的完整步驟
- Springboot?整合?RabbitMQ?消息隊(duì)列?詳情
- springboot整合消息隊(duì)列RabbitMQ
- SpringBoot整合消息隊(duì)列RabbitMQ
- SpringBoot集成消息隊(duì)列的項(xiàng)目實(shí)踐
- SpringBoot基于RabbitMQ實(shí)現(xiàn)消息延時(shí)隊(duì)列的方案
- SpringBoot基于RabbitMQ實(shí)現(xiàn)消息延遲隊(duì)列方案及使用場(chǎng)景
- Springboot RabbitMQ 消息隊(duì)列使用示例詳解
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
相關(guān)文章
jedis連接池對(duì)commons-pool的封裝示例詳解
這篇文章主要為大家介紹了jedis連接池對(duì)commons-pool的封裝示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09
java實(shí)現(xiàn)靜默加載Class示例代碼
這篇文章主要給大家介紹了關(guān)于java實(shí)現(xiàn)靜默加載Class的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-10-10
springboot HandlerIntercepter攔截器修改request body數(shù)據(jù)的操作
這篇文章主要介紹了springboot HandlerIntercepter攔截器修改request body數(shù)據(jù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。2021-06-06
如何使用mybatis-plus實(shí)現(xiàn)分頁(yè)查詢功能
最近在研究mybatis,然后就去找簡(jiǎn)化mybatis開發(fā)的工具,發(fā)現(xiàn)就有通用Mapper和mybatis-plus兩個(gè)比較好的可是使用,可是經(jīng)過對(duì)比發(fā)現(xiàn)還是mybatis-plus比較好,下面這篇文章主要給大家介紹了關(guān)于如何使用mybatis-plus實(shí)現(xiàn)分頁(yè)查詢功能的相關(guān)資料,需要的朋友可以參考下2022-06-06
Netty分布式pipeline管道Handler的添加代碼跟蹤解析
這篇文章主要介紹了Netty分布式pipeline管道Handler的添加代碼跟蹤解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03
Java中double數(shù)值保留兩位小數(shù)的4種實(shí)現(xiàn)方式舉例
在Java編程中,我們經(jīng)常遇到需要對(duì)double類型的浮點(diǎn)數(shù)進(jìn)行精確截?cái)嗷蛩纳嵛迦氡A魞晌恍?shù)的需求,這篇文章主要給大家介紹了關(guān)于Java中double數(shù)值保留兩位小數(shù)的4種實(shí)現(xiàn)方式,需要的朋友可以參考下2024-07-07

