SpringBoot中的多RabbitMQ數(shù)據(jù)源配置實現(xiàn)
簡介
在構建復雜的應用程序時,經(jīng)常需要與多個數(shù)據(jù)源進行交互。這可能包括連接多個數(shù)據(jù)庫、消息隊列或其他數(shù)據(jù)存儲系統(tǒng)。RabbitMQ 是一個流行的消息隊列系統(tǒng),它通過消息隊列實現(xiàn)了應用程序之間的松耦合,適用于異步任務處理、解耦、削峰填谷等場景。本篇博客將介紹如何在 Spring Boot 中配置和管理多個 RabbitMQ 數(shù)據(jù)源,以滿足不同的應用需求,并提供示例代碼使用
1. 依賴引入
首先,在 pom.xml 文件中添加 RabbitMQ 的 Spring Boot Starter 依賴,以便引入 RabbitMQ 相關的庫和功能。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 抽象類
創(chuàng)建一個抽象類 AbstractRabbitConfiguration,其中包含了RabbitMQ的基本配置信息。這些信息包括主機、端口、用戶名、密碼、虛擬主機、隊列名、交換機名、確認機制和消費條數(shù)等。這個抽象類的目的是為了讓子類繼承這些基本配置信息,并根據(jù)不同的數(shù)據(jù)源創(chuàng)建相應的RabbitMQ連接和管理器。
@Data public abstract class AbstractRabbitConfiguration { ? ? ?protected String host; ? ? protected Integer port; ? ? protected String userName; ? ? protected String password; ? ? protected String virtualHost; ? ? protected String queueName; ? ? protected String exchangeName; ? ? protected String routingKey; ? ? protected String acknowledge = "manual"; ? ? protected Integer prefetch = 1; ? ? public ConnectionFactory connectionFactory() { ? ? ? ? CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); ? ? ? ? connectionFactory.setHost(host); ? ? ? ? connectionFactory.setPort(port); ? ? ? ? connectionFactory.setVirtualHost(virtualHost); ? ? ? ? connectionFactory.setUsername(userName); ? ? ? ? connectionFactory.setPassword(password); ? ? ? ? connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); ? ? ? ? connectionFactory.setPublisherReturns(Boolean.TRUE); ? ? ? ? return connectionFactory; ? ? } }
3. 子類
在抽象類的基礎上,我們可以創(chuàng)建多個子類,每個子類對應一個不同的RabbitMQ數(shù)據(jù)源配置。以一個名為 RabbitConfig 的子類為例,假設它是用于主數(shù)據(jù)源的配置。
@Configuration @ConfigurationProperties(prefix = "kxj.rabbit") public class RabbitConfig extends AbstractRabbitConfiguration { ? ? @Bean("primaryConnectionFactory") ? ? @Primary ? ? public ConnectionFactory primaryConnectionFactory() { ? ? ? ? return super.connectionFactory(); ? ? } ? ? @Bean ? ? @Primary ? ? public RabbitTemplate rabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?@Qualifier("confirmCallback") ConfirmCallback confirmCallback, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?@Qualifier("returnCallback") ReturnCallback returnCallback) { ? ? ? ? RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); ? ? ? ? rabbitTemplate.setMandatory(true); ? ? ? ? rabbitTemplate.setConfirmCallback(confirmCallback); ? ? ? ? rabbitTemplate.setReturnCallback(returnCallback); ? ? ? ? rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); ? ? ? ? return rabbitTemplate; ? ? } ? ? @Bean(name = "primaryContainerFactory") ? ? public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory( ? ? ? ? ? ? SimpleRabbitListenerContainerFactoryConfigurer configurer, ? ? ? ? ? ? @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) { ? ? ? ? SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); ? ? ? ? factory.setConnectionFactory(connectionFactory); ? ? ? ? // 設置ACK確認機制 ? ? ? ? factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase())); ? ? ? ? // 設置消費者消費條數(shù) ? ? ? ? factory.setPrefetchCount(prefetch); ? ? ? ? configurer.configure(factory, connectionFactory); ? ? ? ? return factory; ? ? } ? ? @Bean(name = "primaryRabbitAdmin") ? ? public RabbitAdmin rabbitAdmin(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) { ? ? ? ? RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); ? ? ? ? rabbitAdmin.setAutoStartup(true); ? ? ? ? // 聲明交換機,隊列及對應綁定關系 ? ? ? ? Queue queue = RabbitmqUtil.createQueue(queueName); ? ? ? ? FanoutExchange exchange = RabbitmqUtil.createFanoutExchange(exchangeName); ? ? ? ? Binding binding = RabbitmqUtil.createBinding(queue, exchange, ""); ? ? ? ? RabbitmqUtil.createRabbitAdmin(queue, exchange, binding, rabbitAdmin); ? ? ? ? return rabbitAdmin; ? ? } }
在子類中,我們使用 @Configuration 注解將它標記為Spring的配置類,并使用 @ConfigurationProperties 注解將以 kxj.rabbit 為前綴的配置屬性注入到類中。這使得我們可以在配置文件中為不同的數(shù)據(jù)源配置不同的RabbitMQ屬性。
在子類中,我們定義多個Bean來配置RabbitMQ的連接、管理和消息處理等,以滿足不同數(shù)據(jù)源的需求。在這里創(chuàng)建主數(shù)據(jù)源的連接工廠,并使用 @Primary 注解將其標記為默認的連接工廠。
除了連接工廠之外,我們還可以配置其他與RabbitMQ相關的Bean,如 RabbitTemplate、RabbitAdmin 以及回調(diào)類等。這些Bean可以根據(jù)不同數(shù)據(jù)源的需求進行配置,例如設置消息確認機制、消息返回機制和消息轉(zhuǎn)換器等。
另外,我們在 rabbitTemplate 方法中也進行了一些配置,如設置 mandatory 為 true,設置消息轉(zhuǎn)換器為 Jackson2JsonMessageConverter 等。
4. 配置回調(diào)類
在處理消息時,我們通常需要設置確認回調(diào)(ConfirmCallback)和返回回調(diào)(ReturnCallback)。這些回調(diào)類可以用于處理消息的確認和返回情況。
@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { ? ? @Override ? ? public void confirm(CorrelationData correlationData, boolean ack, String cause) { ? ? ? ? if (ack) { ? ? ? ? ? ? log.info("傳遞消息到交換機成功,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause); ? ? ? ? } else { ? ? ? ? ? ? log.error("傳遞消息到交換機失敗,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause); ? ? ? ? } ? ? } } @Slf4j @Component public class ReturnCallback implements RabbitTemplate.ReturnCallback { ? ? @Override ? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { ? ? ? ? String msg = new String(message.getBody()); ? ? ? ? log.error(String.format("消息{%s}不能被正確路由,routingKey為{%s}", msg, routingKey)); ? ? } }
5. 配置文件
server.port=8895 kxj.rabbit.host=MQ地址 kxj.rabbit.port=MQ端口 kxj.rabbit.virtualHost=/ kxj.rabbit.userName=guest kxj.rabbit.password=guest kxj.rabbit.queueName=test.queue kxj.rabbit.exchangeName=test.exchange kxj.rabbit.routingKey=test-routing-key
6. 工具類
在 RabbitMQ 的配置過程中,我們需要聲明交換機、隊列和綁定關系等,這些操作可以通過一個工具類 RabbitmqUtil 來實現(xiàn)。
public class RabbitmqUtil { ? ? public static DirectExchange createDirectExchange(String exchangeName) { ? ? ? ? if (StringUtils.isNotBlank(exchangeName)) { ? ? ? ? ? ? return new DirectExchange(exchangeName, true, false); ? ? ? ? } ? ? ? ? return null; ? ? } ? ? public static TopicExchange createTopicExchange(String exchangeName) { ? ? ? ? if (StringUtils.isNotBlank(exchangeName)) { ? ? ? ? ? ? return new TopicExchange(exchangeName, true, false); ? ? ? ? } ? ? ? ? return null; ? ? } ? ? public static FanoutExchange createFanoutExchange(String exchangeName) { ? ? ? ? if (StringUtils.isNotBlank(exchangeName)) { ? ? ? ? ? ? return new FanoutExchange(exchangeName, true, false); ? ? ? ? } ? ? ? ? return null; ? ? } ? ? public static Queue createQueue(String queueName) { ? ? ? ? if (StringUtils.isNotBlank(queueName)) { ? ? ? ? ? ? return new Queue(queueName, true); ? ? ? ? } ? ? ? ? return null; ? ? } ? ? public static Binding createBinding(Queue queueName, Exchange exchangeName, String routingKeyName) { ? ? ? ? if (Objects.nonNull(queueName) && Objects.nonNull(exchangeName)) { ? ? ? ? ? ? return BindingBuilder.bind(queueName).to(exchangeName).with(routingKeyName).noargs(); ? ? ? ? } ? ? ? ? return null; ? ? } // ? ?public static void createRabbitAdmin(Queue queue, DirectExchange exchange, Binding binding, RabbitAdmin rabbitAdmin) { // ? ? ? ?rabbitAdmin.declareQueue(queue); // ? ? ? ?rabbitAdmin.declareExchange(exchange); // ? ? ? ?rabbitAdmin.declareBinding(binding); // ? ?} ? ? public static void createRabbitAdmin(Queue queue, Exchange exchange, Binding binding, RabbitAdmin rabbitAdmin) { ? ? ? ? if (queue != null) { ? ? ? ? ? ? rabbitAdmin.declareQueue(queue); ? ? ? ? } ? ? ? ? if (exchange != null) { ? ? ? ? ? ? rabbitAdmin.declareExchange(exchange); ? ? ? ? } ? ? ? ? if (binding != null) { ? ? ? ? ? ? rabbitAdmin.declareBinding(binding); ? ? ? ? } ? ? } }
7. 測試用例
我們可以編寫一些測試用例來驗證以上配置是否正確。下面是一個發(fā)送消息到主數(shù)據(jù)源的示例:
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqTest { ? ? @Autowired ? ? @Qualifier("primaryRabbitAdmin") ? ? private RabbitAdmin primaryRabbitAdmin; ? ? @Autowired ? ? @Qualifier("primaryContainerFactory") ? ? private SimpleRabbitListenerContainerFactory primaryContainerFactory; ? ? @Autowired ? ? @Qualifier("primaryConnectionFactory") ? ? private ConnectionFactory primaryConnectionFactory; ? ? @Autowired ? ? private RabbitTemplate primaryRabbitTemplate; ? ? @Test ? ? public void testSend() { ? ? ? ? String message = "Hello, World!"; ? ? ? ? primaryRabbitTemplate.convertAndSend("test.exchange", "test.routingKey", message); ? ? ? ? String receivedMessage = (String) primaryRabbitTemplate.receiveAndConvert("test.queue"); ? ? ? ? assertEquals(message, receivedMessage); ? ? } }
在上面的測試用例中,我們使用了 @Qualifier 注解來指定主數(shù)據(jù)源的 Bean,然后通過 RabbitTemplate 發(fā)送消息到 test.exchange,并在隊列 test.queue 中接收到消息。我們可以通過斷言來判斷發(fā)送和接收的消息是否一致,以此驗證配置是否正確。
總結
通過使用抽象類和子類的方式,我們可以輕松地配置和管理多個RabbitMQ數(shù)據(jù)源,每個數(shù)據(jù)源可以有不同的屬性配置。這種方法使得我們的應用程序更具靈活性,能夠與多個RabbitMQ實例交互,滿足不同數(shù)據(jù)源的需求。同時,回調(diào)類的使用也可以幫助我們處理消息的確認和返回情況,確保消息的可靠性傳遞。
到此這篇關于SpringBoot中的多RabbitMQ數(shù)據(jù)源配置實現(xiàn)的文章就介紹到這了,更多相關SpringBoot 多RabbitMQ數(shù)據(jù)源配置內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
spring cloud Hystrix斷路器的使用(熔斷器)
這篇文章主要介紹了spring cloud Hystrix斷路器的使用(熔斷器),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08SpringBoot2.0集成Swagger2訪問404的解決操作
這篇文章主要介紹了SpringBoot2.0集成Swagger2訪問404的解決操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09SpringSecurity頁面授權與登錄驗證實現(xiàn)(內(nèi)存取值與數(shù)據(jù)庫取值)
Spring Security是一個能夠為基于Spring的企業(yè)應用系統(tǒng)提供聲明式的安全訪問控制解決方案的安全框架,本文主要介紹了SpringSecurity頁面授權與登錄驗證實現(xiàn),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-06-06Java線程池ThreadPoolExecutor源碼深入分析
ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現(xiàn),以內(nèi)部線程池的形式對外提供管理任務執(zhí)行,線程調(diào)度,線程池管理等等服務2022-08-08Java 中二進制轉(zhuǎn)換成十六進制的兩種實現(xiàn)方法
這篇文章主要介紹了Java 中二進制轉(zhuǎn)換成十六進制的兩種實現(xiàn)方法的相關資料,需要的朋友可以參考下2017-06-06