" />

欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot中的多RabbitMQ數(shù)據(jù)源配置實現(xiàn)

 更新時間:2023年09月07日 11:48:36   作者:阿勁  
本篇博客將介紹如何在 Spring Boot 中配置和管理多個 RabbitMQ 數(shù)據(jù)源,以滿足不同的應用需求,具有一定的參考價值,感興趣的可以了解一下

簡介

在構建復雜的應用程序時,經(jīng)常需要與多個數(shù)據(jù)源進行交互。這可能包括連接多個數(shù)據(jù)庫、消息隊列或其他數(shù)據(jù)存儲系統(tǒng)。RabbitMQ 是一個流行的消息隊列系統(tǒng),它通過消息隊列實現(xiàn)了應用程序之間的松耦合,適用于異步任務處理、解耦、削峰填谷等場景。本篇博客將介紹如何在 Spring Boot 中配置和管理多個 RabbitMQ 數(shù)據(jù)源,以滿足不同的應用需求,并提供示例代碼使用

1. 依賴引入

首先,在 pom.xml 文件中添加 RabbitMQ 的 Spring Boot Starter 依賴,以便引入 RabbitMQ 相關的庫和功能。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 抽象類

創(chuàng)建一個抽象類 AbstractRabbitConfiguration,其中包含了RabbitMQ的基本配置信息。這些信息包括主機、端口、用戶名、密碼、虛擬主機、隊列名、交換機名、確認機制和消費條數(shù)等。這個抽象類的目的是為了讓子類繼承這些基本配置信息,并根據(jù)不同的數(shù)據(jù)源創(chuàng)建相應的RabbitMQ連接和管理器。

@Data
public abstract class AbstractRabbitConfiguration {
? ? ?protected String host;
? ? protected Integer port;
? ? protected String userName;
? ? protected String password;
? ? protected String virtualHost;
? ? protected String queueName;
? ? protected String exchangeName;
? ? protected String routingKey;
? ? protected String acknowledge = "manual";
? ? protected Integer prefetch = 1;
? ? public ConnectionFactory connectionFactory() {
? ? ? ? CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
? ? ? ? connectionFactory.setHost(host);
? ? ? ? connectionFactory.setPort(port);
? ? ? ? connectionFactory.setVirtualHost(virtualHost);
? ? ? ? connectionFactory.setUsername(userName);
? ? ? ? connectionFactory.setPassword(password);
? ? ? ? connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
? ? ? ? connectionFactory.setPublisherReturns(Boolean.TRUE);
? ? ? ? return connectionFactory;
? ? }
}

3. 子類

在抽象類的基礎上,我們可以創(chuàng)建多個子類,每個子類對應一個不同的RabbitMQ數(shù)據(jù)源配置。以一個名為 RabbitConfig 的子類為例,假設它是用于主數(shù)據(jù)源的配置。

@Configuration
@ConfigurationProperties(prefix = "kxj.rabbit")
public class RabbitConfig extends AbstractRabbitConfiguration {
? ? @Bean("primaryConnectionFactory")
? ? @Primary
? ? public ConnectionFactory primaryConnectionFactory() {
? ? ? ? return super.connectionFactory();
? ? }
? ? @Bean
? ? @Primary
? ? public RabbitTemplate rabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?@Qualifier("confirmCallback") ConfirmCallback confirmCallback,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?@Qualifier("returnCallback") ReturnCallback returnCallback) {
? ? ? ? RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
? ? ? ? rabbitTemplate.setMandatory(true);
? ? ? ? rabbitTemplate.setConfirmCallback(confirmCallback);
? ? ? ? rabbitTemplate.setReturnCallback(returnCallback);
? ? ? ? rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
? ? ? ? return rabbitTemplate;
? ? }
? ? @Bean(name = "primaryContainerFactory")
? ? public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
? ? ? ? ? ? SimpleRabbitListenerContainerFactoryConfigurer configurer,
? ? ? ? ? ? @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
? ? ? ? SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
? ? ? ? factory.setConnectionFactory(connectionFactory);
? ? ? ? // 設置ACK確認機制
? ? ? ? factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
? ? ? ? // 設置消費者消費條數(shù)
? ? ? ? factory.setPrefetchCount(prefetch);
? ? ? ? configurer.configure(factory, connectionFactory);
? ? ? ? return factory;
? ? }
? ? @Bean(name = "primaryRabbitAdmin")
? ? public RabbitAdmin rabbitAdmin(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
? ? ? ? RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
? ? ? ? rabbitAdmin.setAutoStartup(true);
? ? ? ? // 聲明交換機,隊列及對應綁定關系
? ? ? ? Queue queue = RabbitmqUtil.createQueue(queueName);
? ? ? ? FanoutExchange exchange = RabbitmqUtil.createFanoutExchange(exchangeName);
? ? ? ? Binding binding = RabbitmqUtil.createBinding(queue, exchange, "");
? ? ? ? RabbitmqUtil.createRabbitAdmin(queue, exchange, binding, rabbitAdmin);
? ? ? ? return rabbitAdmin;
? ? }
}

在子類中,我們使用 @Configuration 注解將它標記為Spring的配置類,并使用 @ConfigurationProperties 注解將以 kxj.rabbit 為前綴的配置屬性注入到類中。這使得我們可以在配置文件中為不同的數(shù)據(jù)源配置不同的RabbitMQ屬性。

在子類中,我們定義多個Bean來配置RabbitMQ的連接、管理和消息處理等,以滿足不同數(shù)據(jù)源的需求。在這里創(chuàng)建主數(shù)據(jù)源的連接工廠,并使用 @Primary 注解將其標記為默認的連接工廠。
除了連接工廠之外,我們還可以配置其他與RabbitMQ相關的Bean,如 RabbitTemplate、RabbitAdmin 以及回調(diào)類等。這些Bean可以根據(jù)不同數(shù)據(jù)源的需求進行配置,例如設置消息確認機制、消息返回機制和消息轉(zhuǎn)換器等。

另外,我們在 rabbitTemplate 方法中也進行了一些配置,如設置 mandatory 為 true,設置消息轉(zhuǎn)換器為 Jackson2JsonMessageConverter 等。

4. 配置回調(diào)類

在處理消息時,我們通常需要設置確認回調(diào)(ConfirmCallback)和返回回調(diào)(ReturnCallback)。這些回調(diào)類可以用于處理消息的確認和返回情況。

@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
? ? @Override
? ? public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? if (ack) {
? ? ? ? ? ? log.info("傳遞消息到交換機成功,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause);
? ? ? ? } else {
? ? ? ? ? ? log.error("傳遞消息到交換機失敗,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause);
? ? ? ? }
? ? }
}
@Slf4j
@Component
public class ReturnCallback implements RabbitTemplate.ReturnCallback {
? ? @Override
? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? String msg = new String(message.getBody());
? ? ? ? log.error(String.format("消息{%s}不能被正確路由,routingKey為{%s}", msg, routingKey));
? ? }
}

5. 配置文件

server.port=8895
kxj.rabbit.host=MQ地址
kxj.rabbit.port=MQ端口
kxj.rabbit.virtualHost=/
kxj.rabbit.userName=guest
kxj.rabbit.password=guest
kxj.rabbit.queueName=test.queue
kxj.rabbit.exchangeName=test.exchange
kxj.rabbit.routingKey=test-routing-key

6. 工具類

在 RabbitMQ 的配置過程中,我們需要聲明交換機、隊列和綁定關系等,這些操作可以通過一個工具類 RabbitmqUtil 來實現(xiàn)。

public class RabbitmqUtil {
? ? public static DirectExchange createDirectExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new DirectExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static TopicExchange createTopicExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new TopicExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static FanoutExchange createFanoutExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new FanoutExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static Queue createQueue(String queueName) {
? ? ? ? if (StringUtils.isNotBlank(queueName)) {
? ? ? ? ? ? return new Queue(queueName, true);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static Binding createBinding(Queue queueName, Exchange exchangeName, String routingKeyName) {
? ? ? ? if (Objects.nonNull(queueName) && Objects.nonNull(exchangeName)) {
? ? ? ? ? ? return BindingBuilder.bind(queueName).to(exchangeName).with(routingKeyName).noargs();
? ? ? ? }
? ? ? ? return null;
? ? }
// ? ?public static void createRabbitAdmin(Queue queue, DirectExchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
// ? ? ? ?rabbitAdmin.declareQueue(queue);
// ? ? ? ?rabbitAdmin.declareExchange(exchange);
// ? ? ? ?rabbitAdmin.declareBinding(binding);
// ? ?}
? ? public static void createRabbitAdmin(Queue queue, Exchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
? ? ? ? if (queue != null) {
? ? ? ? ? ? rabbitAdmin.declareQueue(queue);
? ? ? ? }
? ? ? ? if (exchange != null) {
? ? ? ? ? ? rabbitAdmin.declareExchange(exchange);
? ? ? ? }
? ? ? ? if (binding != null) {
? ? ? ? ? ? rabbitAdmin.declareBinding(binding);
? ? ? ? }
? ? }
}

7. 測試用例

我們可以編寫一些測試用例來驗證以上配置是否正確。下面是一個發(fā)送消息到主數(shù)據(jù)源的示例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqTest {
? ? @Autowired
? ? @Qualifier("primaryRabbitAdmin")
? ? private RabbitAdmin primaryRabbitAdmin;
? ? @Autowired
? ? @Qualifier("primaryContainerFactory")
? ? private SimpleRabbitListenerContainerFactory primaryContainerFactory;
? ? @Autowired
? ? @Qualifier("primaryConnectionFactory")
? ? private ConnectionFactory primaryConnectionFactory;
? ? @Autowired
? ? private RabbitTemplate primaryRabbitTemplate;
? ? @Test
? ? public void testSend() {
? ? ? ? String message = "Hello, World!";
? ? ? ? primaryRabbitTemplate.convertAndSend("test.exchange", "test.routingKey", message);
? ? ? ? String receivedMessage = (String) primaryRabbitTemplate.receiveAndConvert("test.queue");
? ? ? ? assertEquals(message, receivedMessage);
? ? }
}

在上面的測試用例中,我們使用了 @Qualifier 注解來指定主數(shù)據(jù)源的 Bean,然后通過 RabbitTemplate 發(fā)送消息到 test.exchange,并在隊列 test.queue 中接收到消息。我們可以通過斷言來判斷發(fā)送和接收的消息是否一致,以此驗證配置是否正確。

總結

通過使用抽象類和子類的方式,我們可以輕松地配置和管理多個RabbitMQ數(shù)據(jù)源,每個數(shù)據(jù)源可以有不同的屬性配置。這種方法使得我們的應用程序更具靈活性,能夠與多個RabbitMQ實例交互,滿足不同數(shù)據(jù)源的需求。同時,回調(diào)類的使用也可以幫助我們處理消息的確認和返回情況,確保消息的可靠性傳遞。

到此這篇關于SpringBoot中的多RabbitMQ數(shù)據(jù)源配置實現(xiàn)的文章就介紹到這了,更多相關SpringBoot 多RabbitMQ數(shù)據(jù)源配置內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java設計模式之外觀模式示例詳解

    Java設計模式之外觀模式示例詳解

    外觀模式為多個復雜的子系統(tǒng),提供了一個一致的界面,使得調(diào)用端只和這個接口發(fā)生調(diào)用,而無須關系這個子系統(tǒng)內(nèi)部的細節(jié)。本文將通過示例詳細為大家講解一下外觀模式,需要的可以參考一下
    2022-03-03
  • Java由淺入深通關抽象類與接口上

    Java由淺入深通關抽象類與接口上

    在類中沒有包含足夠的信息來描繪一個具體的對象,這樣的類稱為抽象類,接口是Java中最重要的概念之一,它可以被理解為一種特殊的類,不同的是接口的成員沒有執(zhí)行體,是由全局常量和公共的抽象方法所組成,本文給大家介紹Java抽象類和接口,感興趣的朋友一起看看吧
    2022-04-04
  • Java中equals()方法的理解與使用方法例子

    Java中equals()方法的理解與使用方法例子

    本文主要介紹了Java中的equals()方法,equals()方法是Object類的方法,用于比較兩個對象是否相等,文中通過代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2024-10-10
  • spring cloud Hystrix斷路器的使用(熔斷器)

    spring cloud Hystrix斷路器的使用(熔斷器)

    這篇文章主要介紹了spring cloud Hystrix斷路器的使用(熔斷器),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-08-08
  • 改變JAVA窗體屬性的操作方法

    改變JAVA窗體屬性的操作方法

    在本篇內(nèi)容里小編給大家詳細分析了關于改變JAVA窗體屬性的操作方法和步驟,需要的朋友們學習下。
    2018-12-12
  • SpringBoot2.0集成Swagger2訪問404的解決操作

    SpringBoot2.0集成Swagger2訪問404的解決操作

    這篇文章主要介紹了SpringBoot2.0集成Swagger2訪問404的解決操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • SpringSecurity頁面授權與登錄驗證實現(xiàn)(內(nèi)存取值與數(shù)據(jù)庫取值)

    SpringSecurity頁面授權與登錄驗證實現(xiàn)(內(nèi)存取值與數(shù)據(jù)庫取值)

    Spring Security是一個能夠為基于Spring的企業(yè)應用系統(tǒng)提供聲明式的安全訪問控制解決方案的安全框架,本文主要介紹了SpringSecurity頁面授權與登錄驗證實現(xiàn),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • Maven依賴管理的用法介紹

    Maven依賴管理的用法介紹

    依賴管理是項目管理中非常重要的一環(huán)。幾乎任何項目開發(fā)的時候需要都需要使用到庫。而這些庫很可能又依賴別的庫,這樣整個項目的依賴形成了一個樹狀結構,而隨著這個依賴的樹的延伸和擴大,一系列問題就會隨之產(chǎn)生
    2022-08-08
  • Java線程池ThreadPoolExecutor源碼深入分析

    Java線程池ThreadPoolExecutor源碼深入分析

    ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現(xiàn),以內(nèi)部線程池的形式對外提供管理任務執(zhí)行,線程調(diào)度,線程池管理等等服務
    2022-08-08
  • Java 中二進制轉(zhuǎn)換成十六進制的兩種實現(xiàn)方法

    Java 中二進制轉(zhuǎn)換成十六進制的兩種實現(xiàn)方法

    這篇文章主要介紹了Java 中二進制轉(zhuǎn)換成十六進制的兩種實現(xiàn)方法的相關資料,需要的朋友可以參考下
    2017-06-06

最新評論