SpringBoot中連接多個(gè)RabbitMQ的方法詳解
1. 前 言
在 SpringBoot 中整合單個(gè) RabbitMQ 使用,是很簡(jiǎn)單的,只需要引入依賴(lài),然后在配置里面配置好 MQ 的連接地址、賬號(hào)、密碼等信息,然后使用即可。
但如果 MQ 的連接地址是多個(gè),那這種連接方式就不奏效了。
前段時(shí)間,我開(kāi)發(fā)的一個(gè)項(xiàng)目就遇到了這樣的問(wèn)題。那個(gè)項(xiàng)目,好幾個(gè)關(guān)聯(lián)方,每個(gè)關(guān)聯(lián)方用的 MQ 的地址都不相同,也就意味著我這邊要連接幾個(gè) RabbbitMQ 地址。SpringBoot 連接多個(gè) RabbitMQ,怎么搞?
使用默認(rèn)的連接方式是行不通的,我已經(jīng)試過(guò),而要實(shí)現(xiàn) SpringBoot 連接多個(gè) RabbitMQ,只能自定義重寫(xiě)一些東西,分別配置才可以,下面一起來(lái)走一下試試。
2. 重 寫(xiě)
首先要明確的是,下面的兩個(gè)類(lèi)是需要重寫(xiě)的:
- RabbitTemplate:往隊(duì)列里面丟消息時(shí),需要用到
- RabbitAdmin:聲明隊(duì)列、聲明交換機(jī)、綁定隊(duì)列和交換機(jī)用到
這里,我定義兩個(gè)關(guān)聯(lián)方,一個(gè)是 one,一個(gè)是 two,分別重寫(xiě)與它們的連接工廠。
2.1 重寫(xiě)與關(guān)聯(lián)方one的連接工廠
package com.yuhuofei.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @author yuhuofei * @version 1.0 * @description 重寫(xiě)與關(guān)聯(lián)方one的連接工廠 * @date 2022/10/3 16:57 */ @Slf4j @Configuration public class OneMQConfig { @Value("${one.spring.rabbitmq.host}") private String host; @Value("${one.spring.rabbitmq.port}") private int port; @Value("${one.spring.rabbitmq.username}") private String username; @Value("${one.spring.rabbitmq.password}") private String password; @Value("${one.spring.rabbitmq.virtual-host}") private String virtualHost; /** * 定義與one的連接工廠 */ @Bean(name = "oneConnectionFactory") @Primary public ConnectionFactory oneConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "oneRabbitTemplate") @Primary public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory); oneRabbitTemplate.setMandatory(true); oneRabbitTemplate.setConnectionFactory(connectionFactory); oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 確認(rèn)消息送到交換機(jī)(Exchange)回調(diào) * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("確認(rèn)消息送到交換機(jī)(Exchange)結(jié)果:"); log.info("相關(guān)數(shù)據(jù):{}", correlationData); boolean ret = false; if (ack) { log.info("消息發(fā)送到交換機(jī)成功, 消息 = {}", correlationData.getId()); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } else { log.error("消息發(fā)送到交換機(jī)失敗! 消息: {}}; 錯(cuò)誤原因:cause: {}", correlationData.getId(), cause); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } } }); oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息沒(méi)有投遞給指定的隊(duì)列 就觸發(fā)這個(gè)失敗回調(diào) * @param message 投遞失敗的消息詳細(xì)信息 * @param replyCode 回復(fù)的狀態(tài)碼 * @param replyText 回復(fù)的文本內(nèi)容 * @param exchange 當(dāng)時(shí)這個(gè)消息發(fā)給那個(gè)交換機(jī) * @param routingKey 當(dāng)時(shí)這個(gè)消息用那個(gè)路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //獲取消息id String messageId = message.getMessageProperties().getMessageId(); // 內(nèi)容 String result = null; try { result = new String(message.getBody(), "UTF-8"); } catch (Exception e) { log.error("消息發(fā)送失敗{}", e); } log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } }); return oneRabbitTemplate; } @Bean(name = "oneFactory") @Primary public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "oneRabbitAdmin") @Primary public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.2 重寫(xiě)與關(guān)聯(lián)方two的連接工廠
package com.yuhuofei.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author yuhuofei * @version 1.0 * @description 重寫(xiě)與關(guān)聯(lián)方two的連接工廠 * @date 2022/10/3 17:52 */ @Slf4j @Configuration public class TwoMQConfig { @Value("${two.spring.rabbitmq.host}") private String host; @Value("${two.spring.rabbitmq.port}") private int port; @Value("${two.spring.rabbitmq.username}") private String username; @Value("${two.spring.rabbitmq.password}") private String password; @Value("${two.spring.rabbitmq.virtualHost}") private String virtualHost; /** * 定義與two的連接工廠 */ @Bean(name = "twoConnectionFactory") public ConnectionFactory twoConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "twoRabbitTemplate") public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory); twoRabbitTemplate.setMandatory(true); twoRabbitTemplate.setConnectionFactory(connectionFactory); twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 確認(rèn)消息送到交換機(jī)(Exchange)回調(diào) * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("確認(rèn)消息送到交換機(jī)(Exchange)結(jié)果:"); log.info("相關(guān)數(shù)據(jù):{}", correlationData); boolean ret = false; if (ack) { log.info("消息發(fā)送到交換機(jī)成功, 消息 = {}", correlationData.getId()); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } else { log.error("消息發(fā)送到交換機(jī)失敗! 消息: {}}; 錯(cuò)誤原因:cause: {}", correlationData.getId(), cause); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } } }); twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息沒(méi)有投遞給指定的隊(duì)列 就觸發(fā)這個(gè)失敗回調(diào) * @param message 投遞失敗的消息詳細(xì)信息 * @param replyCode 回復(fù)的狀態(tài)碼 * @param replyText 回復(fù)的文本內(nèi)容 * @param exchange 當(dāng)時(shí)這個(gè)消息發(fā)給那個(gè)交換機(jī) * @param routingKey 當(dāng)時(shí)這個(gè)消息用那個(gè)路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //獲取消息id String messageId = message.getMessageProperties().getMessageId(); // 內(nèi)容 String result = null; try { result = new String(message.getBody(), "UTF-8"); } catch (Exception e) { log.error("消息發(fā)送失敗{}", e); } log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } }); return twoRabbitTemplate; } @Bean(name = "twoFactory") public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "twoRabbitAdmin") public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.3 創(chuàng)建隊(duì)列及交換機(jī)并綁定
package com.yuhuofei.mq.config; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @author yuhuofei * @version 1.0 * @description 創(chuàng)建隊(duì)列、交換機(jī)并綁定 * @date 2022/10/3 18:15 */ public class QueueConfig { @Resource(name = "oneRabbitAdmin") private RabbitAdmin oneRabbitAdmin; @Resource(name = "twoRabbitAdmin") private RabbitAdmin twoRabbitAdmin; @Value("${one.out.queue}") private String oneOutQueue; @Value("${one.out.queue}") private String oneRoutingKey; @Value("${two.output.queue}") private String twoOutQueue; @Value("${two.output.queue}") private String twoRoutingKey; @Value("${one.topic.exchange.name}") private String oneTopicExchange; @Value("${two.topic.exchange.name}") private String twoTopicExchange; @PostConstruct public void oneRabbitInit() { //聲明交換機(jī) oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false)); //聲明隊(duì)列 oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false)); //綁定隊(duì)列及交換機(jī) oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false)) .to(new TopicExchange(oneTopicExchange, true, false)) .with(oneRoutingKey)); } @PostConstruct public void twoRabbitInit() { //聲明交換機(jī) twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false)); //聲明隊(duì)列 twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true)); //綁定隊(duì)列及交換機(jī) twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false)) .to(new TopicExchange(twoTopicExchange, true, false)) .with(twoRoutingKey)); } }
2.4 配置信息
這里的配置信息,需要與各自的關(guān)聯(lián)方約定好再配置
# 與關(guān)聯(lián)方one的MQ配置 one.spring.rabbitmq.host=one.mq.com one.spring.rabbitmq.port=5672 one.spring.rabbitmq.username=xxxxx one.spring.rabbitmq.password=xxxxx one.spring.rabbitmq.virtual-host=/xxxxx one.out.queue=xxxaa.ssssd.cffs.xxxx one.topic.exchange.name=oneTopExchange # 與關(guān)聯(lián)方two的MQ配置 two.spring.rabbitmq.host=two.mq.com two.spring.rabbitmq.port=5672 two.spring.rabbitmq.username=aaaaaaa two.spring.rabbitmq.password=aaaaaaa two.spring.rabbitmq.virtualHost=/aaaaaaa two.out.queue=ddddd.sssss.hhhhh.eeee two.topic.exchange.name=twoTopExchange
2.5 注意點(diǎn)
在連接多個(gè) MQ 的情況下,需要在某個(gè)連接加上 @Primary 注解(見(jiàn) 2.1 中的代碼),表示主連接,默認(rèn)使用這個(gè)連接,如果不加,服務(wù)會(huì)起不來(lái)
3. 使 用
3.1 作為消費(fèi)者
由于在前面的 2.3 中,聲明了隊(duì)列及交換機(jī),并進(jìn)行了綁定,那么作為消費(fèi)者,監(jiān)聽(tīng)相應(yīng)的隊(duì)列,獲取關(guān)聯(lián)方發(fā)送的消息進(jìn)行處理即可。這里用監(jiān)聽(tīng)關(guān)聯(lián)方 one 的出隊(duì)列做展示,two 的類(lèi)似。
需要注意的地方是,在監(jiān)聽(tīng)隊(duì)列時(shí),需要指定 ContainerFactory。
package com.yuhuofei.mq.service; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; /** * @author yuhuofei * @version 1.0 * @description 監(jiān)聽(tīng)關(guān)聯(lián)方one的消息 * @date 2022/10/3 18:38 */ @Slf4j @Service public class OneReceive { @RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory") public void listenOne(Message message, Channel channel) { //獲取MQ返回的數(shù)據(jù) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); String data = new String(message.getBody(), StandardCharsets.UTF_8); log.info("MQ返回的數(shù)據(jù):{}", data); //下面進(jìn)行業(yè)務(wù)邏輯處理 } }
3.2 作為生產(chǎn)者
使用之前重寫(xiě)的 RabbitTemplate ,向各個(gè)關(guān)聯(lián)方指定的隊(duì)列發(fā)送消息。
package com.yuhuofei.mq.service; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author yuhuofei * @version 1.0 * @description 向關(guān)聯(lián)方的隊(duì)列發(fā)送消息 * @date 2022/10/3 18:47 */ @Slf4j @Service public class SendMessage { @Resource(name = "oneRabbitTemplate") private RabbitTemplate oneRabbitTemplate; @Resource(name = "twoRabbitTemplate") private RabbitTemplate twoRabbitTemplate; public void sendToOneMessage(String messageId, OneMessageConverter message) { String exchange = message.getExchange(); String routingKey = message.getRoutingKey(); JsonObject data = message.getData(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message info = new Message(data.toString().getBytes(), messageProperties); info.getMessageProperties().setMessageId(messageId); oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId)); } public void sendToTwoMessage(String messageId, TwoMessageConverter message) { String exchange = message.getExchange(); String routingKey = message.getRoutingKey(); JsonObject data = message.getData(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message info = new Message(data.toString().getBytes(), messageProperties); info.getMessageProperties().setMessageId(messageId); twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId)); } }
到此這篇關(guān)于SpringBoot中連接多個(gè)RabbitMQ的方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot多個(gè)RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解MyBatis Mapper 代理實(shí)現(xiàn)數(shù)據(jù)庫(kù)調(diào)用原理
這篇文章主要介紹了詳解MyBatis Mapper 代理實(shí)現(xiàn)數(shù)據(jù)庫(kù)調(diào)用原理,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10Spring @Profile注解實(shí)現(xiàn)多環(huán)境配置
這篇文章主要介紹了Spring @Profile注解實(shí)現(xiàn)多環(huán)境配置,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04Java 遞歸查詢(xún)部門(mén)樹(shù)形結(jié)構(gòu)數(shù)據(jù)的實(shí)踐
本文主要介紹了Java 遞歸查詢(xún)部門(mén)樹(shù)形結(jié)構(gòu)數(shù)據(jù)的實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09Java中CaffeineCache自定義緩存時(shí)間的實(shí)現(xiàn)
本文主要介紹了Java中CaffeineCache自定義緩存時(shí)間的實(shí)現(xiàn),通過(guò)聲明緩存value值holder對(duì)象并創(chuàng)建緩存容器,可以為不同的key值指定不同的過(guò)期時(shí)間,具有一定的參考價(jià)值,感興趣的可以了解一下2025-02-02SpringBoot定時(shí)任務(wù)參數(shù)運(yùn)行代碼實(shí)例解析
這篇文章主要介紹了SpringBoot定時(shí)任務(wù)運(yùn)行代碼實(shí)例解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06Spring為singleton?bean注入prototype?bean
這篇文章主要介紹了Spring為singleton?bean注入prototype?bean,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-07-07