SpringBoot中連接多個(gè)RabbitMQ的方法詳解
1. 前 言
在 SpringBoot 中整合單個(gè) RabbitMQ 使用,是很簡(jiǎn)單的,只需要引入依賴(lài),然后在配置里面配置好 MQ 的連接地址、賬號(hào)、密碼等信息,然后使用即可。
但如果 MQ 的連接地址是多個(gè),那這種連接方式就不奏效了。
前段時(shí)間,我開(kāi)發(fā)的一個(gè)項(xiàng)目就遇到了這樣的問(wèn)題。那個(gè)項(xiàng)目,好幾個(gè)關(guān)聯(lián)方,每個(gè)關(guān)聯(lián)方用的 MQ 的地址都不相同,也就意味著我這邊要連接幾個(gè) RabbbitMQ 地址。SpringBoot 連接多個(gè) RabbitMQ,怎么搞?
使用默認(rèn)的連接方式是行不通的,我已經(jīng)試過(guò),而要實(shí)現(xiàn) SpringBoot 連接多個(gè) RabbitMQ,只能自定義重寫(xiě)一些東西,分別配置才可以,下面一起來(lái)走一下試試。
2. 重 寫(xiě)
首先要明確的是,下面的兩個(gè)類(lèi)是需要重寫(xiě)的:
- RabbitTemplate:往隊(duì)列里面丟消息時(shí),需要用到
- RabbitAdmin:聲明隊(duì)列、聲明交換機(jī)、綁定隊(duì)列和交換機(jī)用到
這里,我定義兩個(gè)關(guān)聯(lián)方,一個(gè)是 one,一個(gè)是 two,分別重寫(xiě)與它們的連接工廠。
2.1 重寫(xiě)與關(guān)聯(lián)方one的連接工廠
package com.yuhuofei.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @author yuhuofei * @version 1.0 * @description 重寫(xiě)與關(guān)聯(lián)方one的連接工廠 * @date 2022/10/3 16:57 */ @Slf4j @Configuration public class OneMQConfig { @Value("${one.spring.rabbitmq.host}") private String host; @Value("${one.spring.rabbitmq.port}") private int port; @Value("${one.spring.rabbitmq.username}") private String username; @Value("${one.spring.rabbitmq.password}") private String password; @Value("${one.spring.rabbitmq.virtual-host}") private String virtualHost; /** * 定義與one的連接工廠 */ @Bean(name = "oneConnectionFactory") @Primary public ConnectionFactory oneConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "oneRabbitTemplate") @Primary public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory); oneRabbitTemplate.setMandatory(true); oneRabbitTemplate.setConnectionFactory(connectionFactory); oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 確認(rèn)消息送到交換機(jī)(Exchange)回調(diào) * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("確認(rèn)消息送到交換機(jī)(Exchange)結(jié)果:"); log.info("相關(guān)數(shù)據(jù):{}", correlationData); boolean ret = false; if (ack) { log.info("消息發(fā)送到交換機(jī)成功, 消息 = {}", correlationData.getId()); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } else { log.error("消息發(fā)送到交換機(jī)失敗! 消息: {}}; 錯(cuò)誤原因:cause: {}", correlationData.getId(), cause); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } } }); oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息沒(méi)有投遞給指定的隊(duì)列 就觸發(fā)這個(gè)失敗回調(diào) * @param message 投遞失敗的消息詳細(xì)信息 * @param replyCode 回復(fù)的狀態(tài)碼 * @param replyText 回復(fù)的文本內(nèi)容 * @param exchange 當(dāng)時(shí)這個(gè)消息發(fā)給那個(gè)交換機(jī) * @param routingKey 當(dāng)時(shí)這個(gè)消息用那個(gè)路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //獲取消息id String messageId = message.getMessageProperties().getMessageId(); // 內(nèi)容 String result = null; try { result = new String(message.getBody(), "UTF-8"); } catch (Exception e) { log.error("消息發(fā)送失敗{}", e); } log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } }); return oneRabbitTemplate; } @Bean(name = "oneFactory") @Primary public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "oneRabbitAdmin") @Primary public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.2 重寫(xiě)與關(guān)聯(lián)方two的連接工廠
package com.yuhuofei.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author yuhuofei * @version 1.0 * @description 重寫(xiě)與關(guān)聯(lián)方two的連接工廠 * @date 2022/10/3 17:52 */ @Slf4j @Configuration public class TwoMQConfig { @Value("${two.spring.rabbitmq.host}") private String host; @Value("${two.spring.rabbitmq.port}") private int port; @Value("${two.spring.rabbitmq.username}") private String username; @Value("${two.spring.rabbitmq.password}") private String password; @Value("${two.spring.rabbitmq.virtualHost}") private String virtualHost; /** * 定義與two的連接工廠 */ @Bean(name = "twoConnectionFactory") public ConnectionFactory twoConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "twoRabbitTemplate") public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory); twoRabbitTemplate.setMandatory(true); twoRabbitTemplate.setConnectionFactory(connectionFactory); twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 確認(rèn)消息送到交換機(jī)(Exchange)回調(diào) * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("確認(rèn)消息送到交換機(jī)(Exchange)結(jié)果:"); log.info("相關(guān)數(shù)據(jù):{}", correlationData); boolean ret = false; if (ack) { log.info("消息發(fā)送到交換機(jī)成功, 消息 = {}", correlationData.getId()); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } else { log.error("消息發(fā)送到交換機(jī)失敗! 消息: {}}; 錯(cuò)誤原因:cause: {}", correlationData.getId(), cause); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } } }); twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息沒(méi)有投遞給指定的隊(duì)列 就觸發(fā)這個(gè)失敗回調(diào) * @param message 投遞失敗的消息詳細(xì)信息 * @param replyCode 回復(fù)的狀態(tài)碼 * @param replyText 回復(fù)的文本內(nèi)容 * @param exchange 當(dāng)時(shí)這個(gè)消息發(fā)給那個(gè)交換機(jī) * @param routingKey 當(dāng)時(shí)這個(gè)消息用那個(gè)路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //獲取消息id String messageId = message.getMessageProperties().getMessageId(); // 內(nèi)容 String result = null; try { result = new String(message.getBody(), "UTF-8"); } catch (Exception e) { log.error("消息發(fā)送失敗{}", e); } log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result); //下面可自定義業(yè)務(wù)邏輯處理,如入庫(kù)保存信息等 } }); return twoRabbitTemplate; } @Bean(name = "twoFactory") public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "twoRabbitAdmin") public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.3 創(chuàng)建隊(duì)列及交換機(jī)并綁定
package com.yuhuofei.mq.config; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @author yuhuofei * @version 1.0 * @description 創(chuàng)建隊(duì)列、交換機(jī)并綁定 * @date 2022/10/3 18:15 */ public class QueueConfig { @Resource(name = "oneRabbitAdmin") private RabbitAdmin oneRabbitAdmin; @Resource(name = "twoRabbitAdmin") private RabbitAdmin twoRabbitAdmin; @Value("${one.out.queue}") private String oneOutQueue; @Value("${one.out.queue}") private String oneRoutingKey; @Value("${two.output.queue}") private String twoOutQueue; @Value("${two.output.queue}") private String twoRoutingKey; @Value("${one.topic.exchange.name}") private String oneTopicExchange; @Value("${two.topic.exchange.name}") private String twoTopicExchange; @PostConstruct public void oneRabbitInit() { //聲明交換機(jī) oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false)); //聲明隊(duì)列 oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false)); //綁定隊(duì)列及交換機(jī) oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false)) .to(new TopicExchange(oneTopicExchange, true, false)) .with(oneRoutingKey)); } @PostConstruct public void twoRabbitInit() { //聲明交換機(jī) twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false)); //聲明隊(duì)列 twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true)); //綁定隊(duì)列及交換機(jī) twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false)) .to(new TopicExchange(twoTopicExchange, true, false)) .with(twoRoutingKey)); } }
2.4 配置信息
這里的配置信息,需要與各自的關(guān)聯(lián)方約定好再配置
# 與關(guān)聯(lián)方one的MQ配置 one.spring.rabbitmq.host=one.mq.com one.spring.rabbitmq.port=5672 one.spring.rabbitmq.username=xxxxx one.spring.rabbitmq.password=xxxxx one.spring.rabbitmq.virtual-host=/xxxxx one.out.queue=xxxaa.ssssd.cffs.xxxx one.topic.exchange.name=oneTopExchange # 與關(guān)聯(lián)方two的MQ配置 two.spring.rabbitmq.host=two.mq.com two.spring.rabbitmq.port=5672 two.spring.rabbitmq.username=aaaaaaa two.spring.rabbitmq.password=aaaaaaa two.spring.rabbitmq.virtualHost=/aaaaaaa two.out.queue=ddddd.sssss.hhhhh.eeee two.topic.exchange.name=twoTopExchange
2.5 注意點(diǎn)
在連接多個(gè) MQ 的情況下,需要在某個(gè)連接加上 @Primary 注解(見(jiàn) 2.1 中的代碼),表示主連接,默認(rèn)使用這個(gè)連接,如果不加,服務(wù)會(huì)起不來(lái)
3. 使 用
3.1 作為消費(fèi)者
由于在前面的 2.3 中,聲明了隊(duì)列及交換機(jī),并進(jìn)行了綁定,那么作為消費(fèi)者,監(jiān)聽(tīng)相應(yīng)的隊(duì)列,獲取關(guān)聯(lián)方發(fā)送的消息進(jìn)行處理即可。這里用監(jiān)聽(tīng)關(guān)聯(lián)方 one 的出隊(duì)列做展示,two 的類(lèi)似。
需要注意的地方是,在監(jiān)聽(tīng)隊(duì)列時(shí),需要指定 ContainerFactory。
package com.yuhuofei.mq.service; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; /** * @author yuhuofei * @version 1.0 * @description 監(jiān)聽(tīng)關(guān)聯(lián)方one的消息 * @date 2022/10/3 18:38 */ @Slf4j @Service public class OneReceive { @RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory") public void listenOne(Message message, Channel channel) { //獲取MQ返回的數(shù)據(jù) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); String data = new String(message.getBody(), StandardCharsets.UTF_8); log.info("MQ返回的數(shù)據(jù):{}", data); //下面進(jìn)行業(yè)務(wù)邏輯處理 } }
3.2 作為生產(chǎn)者
使用之前重寫(xiě)的 RabbitTemplate ,向各個(gè)關(guān)聯(lián)方指定的隊(duì)列發(fā)送消息。
package com.yuhuofei.mq.service; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author yuhuofei * @version 1.0 * @description 向關(guān)聯(lián)方的隊(duì)列發(fā)送消息 * @date 2022/10/3 18:47 */ @Slf4j @Service public class SendMessage { @Resource(name = "oneRabbitTemplate") private RabbitTemplate oneRabbitTemplate; @Resource(name = "twoRabbitTemplate") private RabbitTemplate twoRabbitTemplate; public void sendToOneMessage(String messageId, OneMessageConverter message) { String exchange = message.getExchange(); String routingKey = message.getRoutingKey(); JsonObject data = message.getData(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message info = new Message(data.toString().getBytes(), messageProperties); info.getMessageProperties().setMessageId(messageId); oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId)); } public void sendToTwoMessage(String messageId, TwoMessageConverter message) { String exchange = message.getExchange(); String routingKey = message.getRoutingKey(); JsonObject data = message.getData(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message info = new Message(data.toString().getBytes(), messageProperties); info.getMessageProperties().setMessageId(messageId); twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId)); } }
到此這篇關(guān)于SpringBoot中連接多個(gè)RabbitMQ的方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot多個(gè)RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot整合rabbitmq的示例代碼
- springboot集成rabbitMQ之對(duì)象傳輸?shù)姆椒?/a>
- springboot實(shí)現(xiàn)rabbitmq的隊(duì)列初始化和綁定
- SpringBoot+RabbitMq具體使用的幾種姿勢(shì)
- SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備)
- springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))
- SpringBoot中RabbitMQ集群的搭建詳解
- SpringBoot實(shí)現(xiàn)RabbitMQ監(jiān)聽(tīng)消息的四種方式
- SpringBoot整合RabbitMQ之路由模式的實(shí)現(xiàn)
相關(guān)文章
Java 實(shí)現(xiàn)常見(jiàn)的非對(duì)稱(chēng)加密算法
這篇文章主要介紹了Java 實(shí)現(xiàn)常見(jiàn)的非對(duì)稱(chēng)加密算法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-11-11SpringBoot?快速實(shí)現(xiàn)?api?接口加解密功能
在項(xiàng)目中,為了保證數(shù)據(jù)的安全,我們常常會(huì)對(duì)傳遞的數(shù)據(jù)進(jìn)行加密,Spring?Boot接口加密,可以對(duì)返回值、參數(shù)值通過(guò)注解的方式自動(dòng)加解密,這篇文章主要介紹了SpringBoot?快速實(shí)現(xiàn)?api?接口加解密功能,感興趣的朋友一起看看吧2023-10-10springBoot集成redis(jedis)的實(shí)現(xiàn)示例
Redis是我們Java開(kāi)發(fā)中,使用頻次非常高的一個(gè)nosql數(shù)據(jù)庫(kù),本文主要介紹了springBoot集成redis(jedis)的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09Spring攔截器實(shí)現(xiàn)鑒權(quán)的示例代碼
本文主要介紹了Spring攔截器實(shí)現(xiàn)鑒權(quán)的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07mybatis中${}和#{}的區(qū)別以及底層原理分析
這篇文章主要介紹了mybatis中${}和#{}的區(qū)別以及底層原理,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05基于SpringBoot集成測(cè)試遠(yuǎn)程連接Redis服務(wù)的教程詳解
這篇文章主要介紹了基于SpringBoot集成測(cè)試遠(yuǎn)程連接的Redis服務(wù)的相關(guān)知識(shí),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-03-03