SpringBoot中連接多個RabbitMQ的方法詳解
1. 前 言
在 SpringBoot 中整合單個 RabbitMQ 使用,是很簡單的,只需要引入依賴,然后在配置里面配置好 MQ 的連接地址、賬號、密碼等信息,然后使用即可。
但如果 MQ 的連接地址是多個,那這種連接方式就不奏效了。
前段時間,我開發(fā)的一個項目就遇到了這樣的問題。那個項目,好幾個關(guān)聯(lián)方,每個關(guān)聯(lián)方用的 MQ 的地址都不相同,也就意味著我這邊要連接幾個 RabbbitMQ 地址。SpringBoot 連接多個 RabbitMQ,怎么搞?
使用默認的連接方式是行不通的,我已經(jīng)試過,而要實現(xiàn) SpringBoot 連接多個 RabbitMQ,只能自定義重寫一些東西,分別配置才可以,下面一起來走一下試試。
2. 重 寫
首先要明確的是,下面的兩個類是需要重寫的:
- RabbitTemplate:往隊列里面丟消息時,需要用到
- RabbitAdmin:聲明隊列、聲明交換機、綁定隊列和交換機用到
這里,我定義兩個關(guān)聯(lián)方,一個是 one,一個是 two,分別重寫與它們的連接工廠。
2.1 重寫與關(guān)聯(lián)方one的連接工廠
package com.yuhuofei.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* @author yuhuofei
* @version 1.0
* @description 重寫與關(guān)聯(lián)方one的連接工廠
* @date 2022/10/3 16:57
*/
@Slf4j
@Configuration
public class OneMQConfig {
@Value("${one.spring.rabbitmq.host}")
private String host;
@Value("${one.spring.rabbitmq.port}")
private int port;
@Value("${one.spring.rabbitmq.username}")
private String username;
@Value("${one.spring.rabbitmq.password}")
private String password;
@Value("${one.spring.rabbitmq.virtual-host}")
private String virtualHost;
/**
* 定義與one的連接工廠
*/
@Bean(name = "oneConnectionFactory")
@Primary
public ConnectionFactory oneConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean(name = "oneRabbitTemplate")
@Primary
public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory);
oneRabbitTemplate.setMandatory(true);
oneRabbitTemplate.setConnectionFactory(connectionFactory);
oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 確認消息送到交換機(Exchange)回調(diào)
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("確認消息送到交換機(Exchange)結(jié)果:");
log.info("相關(guān)數(shù)據(jù):{}", correlationData);
boolean ret = false;
if (ack) {
log.info("消息發(fā)送到交換機成功, 消息 = {}", correlationData.getId());
//下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
} else {
log.error("消息發(fā)送到交換機失敗! 消息: {}}; 錯誤原因:cause: {}", correlationData.getId(), cause);
//下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
}
}
});
oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息沒有投遞給指定的隊列 就觸發(fā)這個失敗回調(diào)
* @param message 投遞失敗的消息詳細信息
* @param replyCode 回復(fù)的狀態(tài)碼
* @param replyText 回復(fù)的文本內(nèi)容
* @param exchange 當(dāng)時這個消息發(fā)給那個交換機
* @param routingKey 當(dāng)時這個消息用那個路由鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//獲取消息id
String messageId = message.getMessageProperties().getMessageId();
// 內(nèi)容
String result = null;
try {
result = new String(message.getBody(), "UTF-8");
} catch (Exception e) {
log.error("消息發(fā)送失敗{}", e);
}
log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result);
//下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
}
});
return oneRabbitTemplate;
}
@Bean(name = "oneFactory")
@Primary
public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name = "oneRabbitAdmin")
@Primary
public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
2.2 重寫與關(guān)聯(lián)方two的連接工廠
package com.yuhuofei.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author yuhuofei
* @version 1.0
* @description 重寫與關(guān)聯(lián)方two的連接工廠
* @date 2022/10/3 17:52
*/
@Slf4j
@Configuration
public class TwoMQConfig {
@Value("${two.spring.rabbitmq.host}")
private String host;
@Value("${two.spring.rabbitmq.port}")
private int port;
@Value("${two.spring.rabbitmq.username}")
private String username;
@Value("${two.spring.rabbitmq.password}")
private String password;
@Value("${two.spring.rabbitmq.virtualHost}")
private String virtualHost;
/**
* 定義與two的連接工廠
*/
@Bean(name = "twoConnectionFactory")
public ConnectionFactory twoConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean(name = "twoRabbitTemplate")
public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory);
twoRabbitTemplate.setMandatory(true);
twoRabbitTemplate.setConnectionFactory(connectionFactory);
twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 確認消息送到交換機(Exchange)回調(diào)
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("確認消息送到交換機(Exchange)結(jié)果:");
log.info("相關(guān)數(shù)據(jù):{}", correlationData);
boolean ret = false;
if (ack) {
log.info("消息發(fā)送到交換機成功, 消息 = {}", correlationData.getId());
//下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
} else {
log.error("消息發(fā)送到交換機失敗! 消息: {}}; 錯誤原因:cause: {}", correlationData.getId(), cause);
//下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
}
}
});
twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息沒有投遞給指定的隊列 就觸發(fā)這個失敗回調(diào)
* @param message 投遞失敗的消息詳細信息
* @param replyCode 回復(fù)的狀態(tài)碼
* @param replyText 回復(fù)的文本內(nèi)容
* @param exchange 當(dāng)時這個消息發(fā)給那個交換機
* @param routingKey 當(dāng)時這個消息用那個路由鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//獲取消息id
String messageId = message.getMessageProperties().getMessageId();
// 內(nèi)容
String result = null;
try {
result = new String(message.getBody(), "UTF-8");
} catch (Exception e) {
log.error("消息發(fā)送失敗{}", e);
}
log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result);
//下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
}
});
return twoRabbitTemplate;
}
@Bean(name = "twoFactory")
public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name = "twoRabbitAdmin")
public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
2.3 創(chuàng)建隊列及交換機并綁定
package com.yuhuofei.mq.config;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @author yuhuofei
* @version 1.0
* @description 創(chuàng)建隊列、交換機并綁定
* @date 2022/10/3 18:15
*/
public class QueueConfig {
@Resource(name = "oneRabbitAdmin")
private RabbitAdmin oneRabbitAdmin;
@Resource(name = "twoRabbitAdmin")
private RabbitAdmin twoRabbitAdmin;
@Value("${one.out.queue}")
private String oneOutQueue;
@Value("${one.out.queue}")
private String oneRoutingKey;
@Value("${two.output.queue}")
private String twoOutQueue;
@Value("${two.output.queue}")
private String twoRoutingKey;
@Value("${one.topic.exchange.name}")
private String oneTopicExchange;
@Value("${two.topic.exchange.name}")
private String twoTopicExchange;
@PostConstruct
public void oneRabbitInit() {
//聲明交換機
oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false));
//聲明隊列
oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false));
//綁定隊列及交換機
oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false))
.to(new TopicExchange(oneTopicExchange, true, false))
.with(oneRoutingKey));
}
@PostConstruct
public void twoRabbitInit() {
//聲明交換機
twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false));
//聲明隊列
twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true));
//綁定隊列及交換機
twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false))
.to(new TopicExchange(twoTopicExchange, true, false))
.with(twoRoutingKey));
}
}
2.4 配置信息
這里的配置信息,需要與各自的關(guān)聯(lián)方約定好再配置
# 與關(guān)聯(lián)方one的MQ配置 one.spring.rabbitmq.host=one.mq.com one.spring.rabbitmq.port=5672 one.spring.rabbitmq.username=xxxxx one.spring.rabbitmq.password=xxxxx one.spring.rabbitmq.virtual-host=/xxxxx one.out.queue=xxxaa.ssssd.cffs.xxxx one.topic.exchange.name=oneTopExchange # 與關(guān)聯(lián)方two的MQ配置 two.spring.rabbitmq.host=two.mq.com two.spring.rabbitmq.port=5672 two.spring.rabbitmq.username=aaaaaaa two.spring.rabbitmq.password=aaaaaaa two.spring.rabbitmq.virtualHost=/aaaaaaa two.out.queue=ddddd.sssss.hhhhh.eeee two.topic.exchange.name=twoTopExchange
2.5 注意點
在連接多個 MQ 的情況下,需要在某個連接加上 @Primary 注解(見 2.1 中的代碼),表示主連接,默認使用這個連接,如果不加,服務(wù)會起不來
3. 使 用
3.1 作為消費者
由于在前面的 2.3 中,聲明了隊列及交換機,并進行了綁定,那么作為消費者,監(jiān)聽相應(yīng)的隊列,獲取關(guān)聯(lián)方發(fā)送的消息進行處理即可。這里用監(jiān)聽關(guān)聯(lián)方 one 的出隊列做展示,two 的類似。
需要注意的地方是,在監(jiān)聽隊列時,需要指定 ContainerFactory。
package com.yuhuofei.mq.service;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
* @author yuhuofei
* @version 1.0
* @description 監(jiān)聽關(guān)聯(lián)方one的消息
* @date 2022/10/3 18:38
*/
@Slf4j
@Service
public class OneReceive {
@RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory")
public void listenOne(Message message, Channel channel) {
//獲取MQ返回的數(shù)據(jù)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
String data = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("MQ返回的數(shù)據(jù):{}", data);
//下面進行業(yè)務(wù)邏輯處理
}
}
3.2 作為生產(chǎn)者
使用之前重寫的 RabbitTemplate ,向各個關(guān)聯(lián)方指定的隊列發(fā)送消息。
package com.yuhuofei.mq.service;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author yuhuofei
* @version 1.0
* @description 向關(guān)聯(lián)方的隊列發(fā)送消息
* @date 2022/10/3 18:47
*/
@Slf4j
@Service
public class SendMessage {
@Resource(name = "oneRabbitTemplate")
private RabbitTemplate oneRabbitTemplate;
@Resource(name = "twoRabbitTemplate")
private RabbitTemplate twoRabbitTemplate;
public void sendToOneMessage(String messageId, OneMessageConverter message) {
String exchange = message.getExchange();
String routingKey = message.getRoutingKey();
JsonObject data = message.getData();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message info = new Message(data.toString().getBytes(), messageProperties);
info.getMessageProperties().setMessageId(messageId);
oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));
}
public void sendToTwoMessage(String messageId, TwoMessageConverter message) {
String exchange = message.getExchange();
String routingKey = message.getRoutingKey();
JsonObject data = message.getData();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message info = new Message(data.toString().getBytes(), messageProperties);
info.getMessageProperties().setMessageId(messageId);
twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));
}
}
到此這篇關(guān)于SpringBoot中連接多個RabbitMQ的方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot多個RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot項目中枚舉類型字段與前端和數(shù)據(jù)庫的交互方法
最近做的這個項目中,用到了大量的枚舉類,下面這篇文章主要給大家介紹了關(guān)于SpringBoot項目中枚舉類型字段與前端和數(shù)據(jù)庫的交互方法,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-07-07
MyBatisPlus中@TableField注解的基本使用
這篇文章主要介紹了MyBatisPlus中@TableField注解的基本使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07
spring boot高并發(fā)下耗時操作的實現(xiàn)方法
這篇文章主要給大家介紹了關(guān)于spring boot高并發(fā)下耗時操作的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用spring boot具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11
java中的過濾器 Filter應(yīng)用小結(jié)
文章主要介紹了Java Web中的過濾器(Filter)的基本概念、生命周期、配置和應(yīng)用,過濾器可以攔截請求和響應(yīng),用于執(zhí)行一些預(yù)處理或后處理操作,如設(shè)置編碼、校驗用戶身份等,感興趣的朋友一起看看吧2025-03-03
java byte數(shù)組與int,long,short,byte的轉(zhuǎn)換實現(xiàn)方法
下面小編就為大家?guī)硪黄猨ava byte數(shù)組與int,long,short,byte的轉(zhuǎn)換實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-10-10
Java連接mysql數(shù)據(jù)庫以及mysql驅(qū)動jar包下載和使用方法
這篇文章主要給大家介紹了關(guān)于Java連接mysql數(shù)據(jù)庫以及mysql驅(qū)動jar包下載和使用方法,MySQL是一款常用的關(guān)系型數(shù)據(jù)庫,它的JDBC驅(qū)動程序使得我們可以通過Java程序連接MySQL數(shù)據(jù)庫進行數(shù)據(jù)操作,需要的朋友可以參考下2023-11-11
使用Post方式提交數(shù)據(jù)到Tomcat服務(wù)器的方法
這篇將介紹使用Post方式提交數(shù)據(jù)到服務(wù)器,由于Post的方式和Get方式創(chuàng)建Web工程是一模一樣的,只用幾個地方的代碼不同,這篇文章主要介紹了使用Post方式提交數(shù)據(jù)到Tomcat服務(wù)器的方法,感興趣的朋友一起學(xué)習(xí)吧2016-04-04
Mybatis-plus的selectPage()分頁查詢不生效問題解決
本文主要介紹了Mybatis-plus的selectPage()分頁查詢不生效問題解決,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01

