SpringBoot中的多RabbitMQ數(shù)據(jù)源配置實(shí)現(xiàn)
簡介
在構(gòu)建復(fù)雜的應(yīng)用程序時(shí),經(jīng)常需要與多個(gè)數(shù)據(jù)源進(jìn)行交互。這可能包括連接多個(gè)數(shù)據(jù)庫、消息隊(duì)列或其他數(shù)據(jù)存儲(chǔ)系統(tǒng)。RabbitMQ 是一個(gè)流行的消息隊(duì)列系統(tǒng),它通過消息隊(duì)列實(shí)現(xiàn)了應(yīng)用程序之間的松耦合,適用于異步任務(wù)處理、解耦、削峰填谷等場(chǎng)景。本篇博客將介紹如何在 Spring Boot 中配置和管理多個(gè) RabbitMQ 數(shù)據(jù)源,以滿足不同的應(yīng)用需求,并提供示例代碼使用
1. 依賴引入
首先,在 pom.xml 文件中添加 RabbitMQ 的 Spring Boot Starter 依賴,以便引入 RabbitMQ 相關(guān)的庫和功能。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2. 抽象類
創(chuàng)建一個(gè)抽象類 AbstractRabbitConfiguration,其中包含了RabbitMQ的基本配置信息。這些信息包括主機(jī)、端口、用戶名、密碼、虛擬主機(jī)、隊(duì)列名、交換機(jī)名、確認(rèn)機(jī)制和消費(fèi)條數(shù)等。這個(gè)抽象類的目的是為了讓子類繼承這些基本配置信息,并根據(jù)不同的數(shù)據(jù)源創(chuàng)建相應(yīng)的RabbitMQ連接和管理器。
@Data
public abstract class AbstractRabbitConfiguration {
? ? ?protected String host;
? ? protected Integer port;
? ? protected String userName;
? ? protected String password;
? ? protected String virtualHost;
? ? protected String queueName;
? ? protected String exchangeName;
? ? protected String routingKey;
? ? protected String acknowledge = "manual";
? ? protected Integer prefetch = 1;
? ? public ConnectionFactory connectionFactory() {
? ? ? ? CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
? ? ? ? connectionFactory.setHost(host);
? ? ? ? connectionFactory.setPort(port);
? ? ? ? connectionFactory.setVirtualHost(virtualHost);
? ? ? ? connectionFactory.setUsername(userName);
? ? ? ? connectionFactory.setPassword(password);
? ? ? ? connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
? ? ? ? connectionFactory.setPublisherReturns(Boolean.TRUE);
? ? ? ? return connectionFactory;
? ? }
}3. 子類
在抽象類的基礎(chǔ)上,我們可以創(chuàng)建多個(gè)子類,每個(gè)子類對(duì)應(yīng)一個(gè)不同的RabbitMQ數(shù)據(jù)源配置。以一個(gè)名為 RabbitConfig 的子類為例,假設(shè)它是用于主數(shù)據(jù)源的配置。
@Configuration
@ConfigurationProperties(prefix = "kxj.rabbit")
public class RabbitConfig extends AbstractRabbitConfiguration {
? ? @Bean("primaryConnectionFactory")
? ? @Primary
? ? public ConnectionFactory primaryConnectionFactory() {
? ? ? ? return super.connectionFactory();
? ? }
? ? @Bean
? ? @Primary
? ? public RabbitTemplate rabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?@Qualifier("confirmCallback") ConfirmCallback confirmCallback,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?@Qualifier("returnCallback") ReturnCallback returnCallback) {
? ? ? ? RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
? ? ? ? rabbitTemplate.setMandatory(true);
? ? ? ? rabbitTemplate.setConfirmCallback(confirmCallback);
? ? ? ? rabbitTemplate.setReturnCallback(returnCallback);
? ? ? ? rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
? ? ? ? return rabbitTemplate;
? ? }
? ? @Bean(name = "primaryContainerFactory")
? ? public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
? ? ? ? ? ? SimpleRabbitListenerContainerFactoryConfigurer configurer,
? ? ? ? ? ? @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
? ? ? ? SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
? ? ? ? factory.setConnectionFactory(connectionFactory);
? ? ? ? // 設(shè)置ACK確認(rèn)機(jī)制
? ? ? ? factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
? ? ? ? // 設(shè)置消費(fèi)者消費(fèi)條數(shù)
? ? ? ? factory.setPrefetchCount(prefetch);
? ? ? ? configurer.configure(factory, connectionFactory);
? ? ? ? return factory;
? ? }
? ? @Bean(name = "primaryRabbitAdmin")
? ? public RabbitAdmin rabbitAdmin(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
? ? ? ? RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
? ? ? ? rabbitAdmin.setAutoStartup(true);
? ? ? ? // 聲明交換機(jī),隊(duì)列及對(duì)應(yīng)綁定關(guān)系
? ? ? ? Queue queue = RabbitmqUtil.createQueue(queueName);
? ? ? ? FanoutExchange exchange = RabbitmqUtil.createFanoutExchange(exchangeName);
? ? ? ? Binding binding = RabbitmqUtil.createBinding(queue, exchange, "");
? ? ? ? RabbitmqUtil.createRabbitAdmin(queue, exchange, binding, rabbitAdmin);
? ? ? ? return rabbitAdmin;
? ? }
}在子類中,我們使用 @Configuration 注解將它標(biāo)記為Spring的配置類,并使用 @ConfigurationProperties 注解將以 kxj.rabbit 為前綴的配置屬性注入到類中。這使得我們可以在配置文件中為不同的數(shù)據(jù)源配置不同的RabbitMQ屬性。
在子類中,我們定義多個(gè)Bean來配置RabbitMQ的連接、管理和消息處理等,以滿足不同數(shù)據(jù)源的需求。在這里創(chuàng)建主數(shù)據(jù)源的連接工廠,并使用 @Primary 注解將其標(biāo)記為默認(rèn)的連接工廠。
除了連接工廠之外,我們還可以配置其他與RabbitMQ相關(guān)的Bean,如 RabbitTemplate、RabbitAdmin 以及回調(diào)類等。這些Bean可以根據(jù)不同數(shù)據(jù)源的需求進(jìn)行配置,例如設(shè)置消息確認(rèn)機(jī)制、消息返回機(jī)制和消息轉(zhuǎn)換器等。
另外,我們?cè)?rabbitTemplate 方法中也進(jìn)行了一些配置,如設(shè)置 mandatory 為 true,設(shè)置消息轉(zhuǎn)換器為 Jackson2JsonMessageConverter 等。
4. 配置回調(diào)類
在處理消息時(shí),我們通常需要設(shè)置確認(rèn)回調(diào)(ConfirmCallback)和返回回調(diào)(ReturnCallback)。這些回調(diào)類可以用于處理消息的確認(rèn)和返回情況。
@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
? ? @Override
? ? public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? if (ack) {
? ? ? ? ? ? log.info("傳遞消息到交換機(jī)成功,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause);
? ? ? ? } else {
? ? ? ? ? ? log.error("傳遞消息到交換機(jī)失敗,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause);
? ? ? ? }
? ? }
}
@Slf4j
@Component
public class ReturnCallback implements RabbitTemplate.ReturnCallback {
? ? @Override
? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? String msg = new String(message.getBody());
? ? ? ? log.error(String.format("消息{%s}不能被正確路由,routingKey為{%s}", msg, routingKey));
? ? }
}5. 配置文件
server.port=8895 kxj.rabbit.host=MQ地址 kxj.rabbit.port=MQ端口 kxj.rabbit.virtualHost=/ kxj.rabbit.userName=guest kxj.rabbit.password=guest kxj.rabbit.queueName=test.queue kxj.rabbit.exchangeName=test.exchange kxj.rabbit.routingKey=test-routing-key
6. 工具類
在 RabbitMQ 的配置過程中,我們需要聲明交換機(jī)、隊(duì)列和綁定關(guān)系等,這些操作可以通過一個(gè)工具類 RabbitmqUtil 來實(shí)現(xiàn)。
public class RabbitmqUtil {
? ? public static DirectExchange createDirectExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new DirectExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static TopicExchange createTopicExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new TopicExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static FanoutExchange createFanoutExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new FanoutExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static Queue createQueue(String queueName) {
? ? ? ? if (StringUtils.isNotBlank(queueName)) {
? ? ? ? ? ? return new Queue(queueName, true);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static Binding createBinding(Queue queueName, Exchange exchangeName, String routingKeyName) {
? ? ? ? if (Objects.nonNull(queueName) && Objects.nonNull(exchangeName)) {
? ? ? ? ? ? return BindingBuilder.bind(queueName).to(exchangeName).with(routingKeyName).noargs();
? ? ? ? }
? ? ? ? return null;
? ? }
// ? ?public static void createRabbitAdmin(Queue queue, DirectExchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
// ? ? ? ?rabbitAdmin.declareQueue(queue);
// ? ? ? ?rabbitAdmin.declareExchange(exchange);
// ? ? ? ?rabbitAdmin.declareBinding(binding);
// ? ?}
? ? public static void createRabbitAdmin(Queue queue, Exchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
? ? ? ? if (queue != null) {
? ? ? ? ? ? rabbitAdmin.declareQueue(queue);
? ? ? ? }
? ? ? ? if (exchange != null) {
? ? ? ? ? ? rabbitAdmin.declareExchange(exchange);
? ? ? ? }
? ? ? ? if (binding != null) {
? ? ? ? ? ? rabbitAdmin.declareBinding(binding);
? ? ? ? }
? ? }
}7. 測(cè)試用例
我們可以編寫一些測(cè)試用例來驗(yàn)證以上配置是否正確。下面是一個(gè)發(fā)送消息到主數(shù)據(jù)源的示例:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqTest {
? ? @Autowired
? ? @Qualifier("primaryRabbitAdmin")
? ? private RabbitAdmin primaryRabbitAdmin;
? ? @Autowired
? ? @Qualifier("primaryContainerFactory")
? ? private SimpleRabbitListenerContainerFactory primaryContainerFactory;
? ? @Autowired
? ? @Qualifier("primaryConnectionFactory")
? ? private ConnectionFactory primaryConnectionFactory;
? ? @Autowired
? ? private RabbitTemplate primaryRabbitTemplate;
? ? @Test
? ? public void testSend() {
? ? ? ? String message = "Hello, World!";
? ? ? ? primaryRabbitTemplate.convertAndSend("test.exchange", "test.routingKey", message);
? ? ? ? String receivedMessage = (String) primaryRabbitTemplate.receiveAndConvert("test.queue");
? ? ? ? assertEquals(message, receivedMessage);
? ? }
}在上面的測(cè)試用例中,我們使用了 @Qualifier 注解來指定主數(shù)據(jù)源的 Bean,然后通過 RabbitTemplate 發(fā)送消息到 test.exchange,并在隊(duì)列 test.queue 中接收到消息。我們可以通過斷言來判斷發(fā)送和接收的消息是否一致,以此驗(yàn)證配置是否正確。
總結(jié)
通過使用抽象類和子類的方式,我們可以輕松地配置和管理多個(gè)RabbitMQ數(shù)據(jù)源,每個(gè)數(shù)據(jù)源可以有不同的屬性配置。這種方法使得我們的應(yīng)用程序更具靈活性,能夠與多個(gè)RabbitMQ實(shí)例交互,滿足不同數(shù)據(jù)源的需求。同時(shí),回調(diào)類的使用也可以幫助我們處理消息的確認(rèn)和返回情況,確保消息的可靠性傳遞。
到此這篇關(guān)于SpringBoot中的多RabbitMQ數(shù)據(jù)源配置實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)SpringBoot 多RabbitMQ數(shù)據(jù)源配置內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring cloud Hystrix斷路器的使用(熔斷器)
這篇文章主要介紹了spring cloud Hystrix斷路器的使用(熔斷器),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-08-08
SpringBoot2.0集成Swagger2訪問404的解決操作
這篇文章主要介紹了SpringBoot2.0集成Swagger2訪問404的解決操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09
SpringSecurity頁面授權(quán)與登錄驗(yàn)證實(shí)現(xiàn)(內(nèi)存取值與數(shù)據(jù)庫取值)
Spring Security是一個(gè)能夠?yàn)榛赟pring的企業(yè)應(yīng)用系統(tǒng)提供聲明式的安全訪問控制解決方案的安全框架,本文主要介紹了SpringSecurity頁面授權(quán)與登錄驗(yàn)證實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-06-06
Java線程池ThreadPoolExecutor源碼深入分析
ThreadPoolExecutor作為java.util.concurrent包對(duì)外提供基礎(chǔ)實(shí)現(xiàn),以內(nèi)部線程池的形式對(duì)外提供管理任務(wù)執(zhí)行,線程調(diào)度,線程池管理等等服務(wù)2022-08-08
Java 中二進(jìn)制轉(zhuǎn)換成十六進(jìn)制的兩種實(shí)現(xiàn)方法
這篇文章主要介紹了Java 中二進(jìn)制轉(zhuǎn)換成十六進(jìn)制的兩種實(shí)現(xiàn)方法的相關(guān)資料,需要的朋友可以參考下2017-06-06

