聊聊RabbitMQ發(fā)布確認(rèn)高級(jí)問題
1、發(fā)布確認(rèn)高級(jí)
1. 存在的問題
再生產(chǎn)環(huán)境中由于一些不明原因?qū)е?code>rabbitmq重啟,在RabbitMQ
重啟期間生產(chǎn)者消息投遞失敗,會(huì)導(dǎo)致消息丟失。
1.1、發(fā)布確認(rèn)SpringBoot版本
1.1.1、確認(rèn)機(jī)制方案
當(dāng)消息不能正常被接收的時(shí)候,我們需要將消息存放在緩存中。
1.1.2、代碼架構(gòu)圖
1.1.3、配置文件
spring.rabbitmq.host=192.168.123.129 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123 spring.rabbitmq.publisher-confirm-type=correlated
NONE
:禁用發(fā)布確認(rèn)模式,是默認(rèn)值。CORRELATED
:發(fā)布消息成功到交換機(jī)會(huì)觸發(fā)回調(diào)方方法。CORRELATED
:就是發(fā)布一個(gè)就確認(rèn)一個(gè)。
1.1.4、配置類
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
1.1.5、回調(diào)接口
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 回調(diào)接口 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } /** * 交換機(jī)接受失敗后進(jìn)行回調(diào) * 1. 保存消息的ID及相關(guān)消息 * 2. 是否接收成功 * 3. 接受失敗的原因 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if(b == true){ log.info("交換機(jī)已經(jīng)收到id為:{}的消息",id); }else{ log.info("交換機(jī)還未收到id為:{}消息,由于原因:{}",id,s); } } }
1.1.6、生產(chǎn)者
import com.xiao.springbootrabbitmq.utils.MyCallBack; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @RestController @RequestMapping("/confirm") @Slf4j public class Producer { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData1 = new CorrelationData("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); CorrelationData correlationData2 = new CorrelationData("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info("發(fā)送得內(nèi)容是:{}",message); } }
1.1.7、消費(fèi)者
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class ConfirmConsumer { public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; @RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMessage(Message message){ String msg = new String(message.getBody()); log.info("接收到隊(duì)列" + CONFIRM_QUEUE_NAME + "消息:{}",msg); } }
1.1.8、測(cè)試結(jié)果
1. 第一種情況
ID
為1
的消息正常送達(dá),ID
為2
的消息由于RoutingKey
的錯(cuò)誤,導(dǎo)致不能正常被消費(fèi),但是交換機(jī)還是正常收到了消息,所以此時(shí)由于交換機(jī)正常接收之后的原因丟失的消息不能正常被接收。
2. 第二種情況
我們?cè)偕弦环N情況下修改了ID
為1
的消息的交換機(jī)的名稱,所以此時(shí)回調(diào)函數(shù)會(huì)進(jìn)行回答由于什么原因?qū)е陆粨Q機(jī)無法接收成功消息。
1.2、回退消息
1.2.1、Mandatory參數(shù)
- 在僅開啟了生產(chǎn)者確認(rèn)機(jī)制的情況下,交換機(jī)接收到消息后,會(huì)直接給消息生產(chǎn)者發(fā)送確認(rèn)消息,如果發(fā)現(xiàn)該消息不可路由(就是消息被交換機(jī)成功接收后,無法到達(dá)隊(duì)列),那么消息會(huì)直接被丟棄,此時(shí)生產(chǎn)者是不知道消息被丟棄這個(gè)事件的。
- 通過設(shè)置該參數(shù)可以在消息傳遞過程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者。
1.2.2、配置文件
spring.rabbitmq.publisher-returns=true
需要在配置文件種開啟返回回調(diào)
1.2.3、生產(chǎn)者代碼
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @RestController @RequestMapping("/confirm") @Slf4j public class Producer { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData1 = new CorrelationData("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); log.info("發(fā)送得內(nèi)容是:{}",message + routingKey1); CorrelationData correlationData2 = new CorrelationData("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info("發(fā)送得內(nèi)容是:{}",message + routingKey2); } }
1.2.4、回調(diào)接口代碼
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 回調(diào)接口 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /** * 交換機(jī)接受失敗后進(jìn)行回調(diào) * 1. 保存消息的ID及相關(guān)消息 * 2. 是否接收成功 * 3. 接受失敗的原因 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if(b == true){ log.info("交換機(jī)已經(jīng)收到id為:{}的消息",id); }else{ log.info("交換機(jī)還未收到id為:{}消息,由于原因:{}",id,s); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { Message message = returnedMessage.getMessage(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); String replyText = returnedMessage.getReplyText(); log.error("消息{},被交換機(jī){}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey); } }
1.2.5、測(cè)試結(jié)果
其他類的代碼與上一小節(jié)案例相同
ID
為2
的消息由于RoutingKey
不可路由,但是還是被回調(diào)函數(shù)處理了。
1.3、備份交換機(jī)
1.3.1、代碼架構(gòu)圖
這里我們新增了備份交換機(jī)、備份隊(duì)列、報(bào)警隊(duì)列。它們綁定關(guān)系如圖所示。如果確認(rèn)交換機(jī)成功接收的消息無法路由到相應(yīng)的隊(duì)列,就會(huì)被確認(rèn)交換機(jī)發(fā)送給備份交換機(jī)。
1.3.2、配置類代碼
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String BACKUP_EXCHANGE_NAME = "backup_exchange"; public static final String BACKUP_QUEUE_NAME = "backup_queue"; public static final String WARNING_QUEUE_NAME = "warning_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build(); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean("backupQueue") public Queue backupQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } @Bean public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("backupQueue") Queue backupQueue){ return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("warningQueue") Queue warningQueue){ return BindingBuilder.bind(warningQueue).to(backupExchange); } }
1.3.3、消費(fèi)者代碼
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class WarningConsumer { public static final String WARNING_QUEUE_NAME = "warning_queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveMessage(Message message){ String msg = new String(message.getBody()); log.info("報(bào)警發(fā)現(xiàn)不可路由的消息內(nèi)容為:{}",msg); } }
1.3.4、測(cè)試結(jié)果
mandatory參數(shù)與備份交換機(jī)可以一起使用的時(shí)候,如果兩者同時(shí)開啟,備份交換機(jī)優(yōu)先級(jí)高。
到此這篇關(guān)于RabbitMQ發(fā)布確認(rèn)高級(jí)的文章就介紹到這了,更多相關(guān)RabbitMQ發(fā)布確認(rèn)高級(jí)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java數(shù)據(jù)庫(kù)開發(fā)之JDBC基礎(chǔ)使用方法及實(shí)例詳解
這篇文章主要介紹了java數(shù)據(jù)庫(kù)開發(fā)之JDBC基礎(chǔ)知識(shí)詳解,需要的朋友可以參考下2020-02-02java算法題解LeetCode35復(fù)雜鏈表的復(fù)制實(shí)例
這篇文章主要為大家介紹了java算法題解LeetCode35復(fù)雜鏈表的復(fù)制實(shí)例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01java實(shí)現(xiàn)文件打包壓縮輸出到瀏覽器下載
這篇文章主要介紹了java實(shí)現(xiàn)文件打包壓縮輸出到瀏覽器下載,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Maven 多模塊父子工程的實(shí)現(xiàn)(含Spring Boot示例)
這篇文章主要介紹了Maven 多模塊父子工程的實(shí)現(xiàn)(含Spring Boot示例),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04SpringBoot升級(jí)3.2報(bào)錯(cuò)Invalid value type for
這篇文章給大家介紹了SpringBoot升級(jí)3.2報(bào)錯(cuò)Invalid value type for attribute ‘factoryBeanObjectType‘: java.lang.String的解決方案,文中有詳細(xì)的原因分析,需要的朋友可以參考下2023-12-12解析SpringBoot項(xiàng)目開發(fā)之Gzip壓縮過程
這篇文章主要介紹了SpringBoot項(xiàng)目開發(fā)之Gzip壓縮過程,本文給大家分享幾種Gzip壓縮方式,通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07java實(shí)現(xiàn)大數(shù)加法(BigDecimal)的實(shí)例代碼
之前寫過用vector、string實(shí)現(xiàn)大數(shù)加法,現(xiàn)在用java的BigDecimal類,代碼簡(jiǎn)單很多。但是在online-judge上,java的代碼運(yùn)行時(shí)間和內(nèi)存大得多2013-10-10