RabbitMq消息防丟失功能實(shí)現(xiàn)方式講解
1.概述
1.1.數(shù)據(jù)丟失的原因
在消息中有三種可能性造成數(shù)據(jù)丟失:
- 消費(fèi)者消費(fèi)消息失敗
- 生產(chǎn)者生產(chǎn)消息失敗
- MQ數(shù)據(jù)丟失
消費(fèi)者消費(fèi)消息失?。?/p>
RabbitMq存在應(yīng)答機(jī)制,默認(rèn)為自動(dòng)應(yīng)答,MQ向消費(fèi)者推送一條消息,消費(fèi)者收到這條消息后會(huì)返回一個(gè)ack(應(yīng)答)給MQ,MQ收到應(yīng)答后會(huì)刪除這條消息。
自動(dòng)應(yīng)答存在一個(gè)問題,就是消費(fèi)者收到消息后立馬就會(huì)給MQ返回ack,如果消費(fèi)者返回完ack但還沒來的及真正處理這條消息時(shí),消費(fèi)者斷電宕機(jī)了,那么這條消息就丟失了。
這就是由于消費(fèi)者消費(fèi)消息失敗造成的數(shù)據(jù)丟失。
生產(chǎn)者生產(chǎn)數(shù)據(jù)失?。?/p>
生產(chǎn)者向MQ推送了一條消息,但是由于由于諸如網(wǎng)絡(luò)故障等原因mq并沒有收到該條消息,這樣就造成了這條消息的丟失。
MQ數(shù)據(jù)丟失:
MQ的數(shù)據(jù)是存在內(nèi)存中的,諸如斷電等原因可能會(huì)造成數(shù)據(jù)的丟失。
1.2.如何防止數(shù)據(jù)丟失
解決以上列舉的數(shù)據(jù)丟失問題的辦法有三種:
- 手動(dòng)應(yīng)答
- 消息確認(rèn)機(jī)制
- 持久化
手動(dòng)應(yīng)答:
RabbitMQ默認(rèn)是自動(dòng)應(yīng)答,消費(fèi)者收到消息后就會(huì)自動(dòng)返回ack給MQ,可以將應(yīng)答模式改為手動(dòng)應(yīng)答,在消費(fèi)者一側(cè)消息的消費(fèi)動(dòng)作完成后手動(dòng)來返回ack給MQ,用來解決“消費(fèi)者消費(fèi)消息失敗”問題。
消息確認(rèn)機(jī)制:
當(dāng)消息隊(duì)列收到消息后,告知生產(chǎn)者,讓生產(chǎn)者感知到自己生產(chǎn)的消息,消息隊(duì)列已經(jīng)接收到,用來解決“生產(chǎn)者生產(chǎn)消息失敗”問題。消息確認(rèn)機(jī)制有兩種實(shí)現(xiàn)方式:
- AMQP事務(wù)
- confirm
持久化:
消息隊(duì)列的消息持久化到磁盤上,用來解決“MQ數(shù)據(jù)丟失”問題。
2.手動(dòng)應(yīng)答
手動(dòng)應(yīng)答是通過設(shè)置channel來實(shí)現(xiàn)的,以下為一個(gè)完整代碼示例。
配置類:
@Configuration
public class config {
@Bean
public Queue queue(){
return new Queue("queue_01",false);
}
}生產(chǎn)者:
@SpringBootTest(classes = Main.class)
public class Producer {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void producerMsg(){
rabbitTemplate.convertAndSend("queue_01","hello_world");
}
}消費(fèi)者:
@Component
@Slf4j
public class Consumer {
@RabbitListener(queues = {"queue_01"})
public void consumerMsg(String msg, Message message,Channel channel){
try {
log.info("消費(fèi)者消費(fèi)消息: "+msg);
/**
* 沒有異常就確認(rèn)消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn)
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
/**
* 有異常就拒收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:true為將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者;
* false將消息丟棄
*/
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (Exception ex) {
log.error(ex.getMessage());
}
}
}
}3.消息確認(rèn)機(jī)制
AQMP事務(wù)、confirm其實(shí)都是基于channel的。
3.1.AMQP事務(wù)
AMQP事務(wù)和數(shù)據(jù)庫事務(wù)類似,定義一組對(duì)MQ的操作,統(tǒng)一提交,成功則全部一起執(zhí)行,失敗則全部回滾。AMQP事務(wù)在spring boot中的使用很簡(jiǎn)單,和數(shù)據(jù)庫的事務(wù)一樣,一個(gè)注解就可以搞定。
@GetMapping("/direct/wx/transactional")
@Transactional(rollbackFor = Exception.class)
public String sendDirectMessageTransactional() {
rabbitTemplate.convertAndSend("direct_exchange", "wx","hello world!");
log.info("開啟事務(wù)消息機(jī)制");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
return "ok";
}3.2.confirm
confirm是基于channel的,一旦channel進(jìn)入confirm模式,所有在該channel上發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開始),消息被投遞道匹配隊(duì)列后broker會(huì)發(fā)送一個(gè)確認(rèn)消息給生產(chǎn)者。如果消息和隊(duì)列是可持久化的(durable為true),那么確認(rèn)消息會(huì)在消息被寫入磁盤后發(fā)出。
confirm最大的好處在于異步,生產(chǎn)者在等待上一條消息的確認(rèn)消息的時(shí)候可以繼續(xù)往下發(fā)送。
confirm在spring boot中的使用很簡(jiǎn)單,在配置文件中開啟即可,并且支持自定義回調(diào)函數(shù):
配置文件:
spring.rabbitmq.publisher-confirms: true
spring.rabbitmq.publisher-returns: true
生產(chǎn)者:
@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange,String routingKey,Object msg) {
// 設(shè)置交換機(jī)處理失敗消息的模式 true 表示消息由交換機(jī) 到達(dá)不了隊(duì)列時(shí),會(huì)將消息重新返回給生產(chǎn)者
// 如果不設(shè)置這個(gè)指令,則交換機(jī)向隊(duì)列推送消息失敗后,不會(huì)觸發(fā) setReturnCallback
rabbitTemplate.setMandatory(true);
//消息消費(fèi)者確認(rèn)收到消息后,手動(dòng)ack回執(zhí)
rabbitTemplate.setConfirmCallback(this);
// 暫時(shí)關(guān)閉 return 配置
//rabbitTemplate.setReturnCallback(this);
//發(fā)送消息
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}
/**
* 交換機(jī)并未將數(shù)據(jù)丟入指定的隊(duì)列中時(shí),觸發(fā)
* channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
* 參數(shù)三:true 表示如果消息無法正常投遞,則return給生產(chǎn)者 ;false 表示直接丟棄
* @param message 消息對(duì)象
* @param replyCode 錯(cuò)誤碼
* @param replyText 錯(cuò)誤信息
* @param exchange 交換機(jī)
* @param routingKey 路由鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
}
/**
* 消息生產(chǎn)者發(fā)送消息至交換機(jī)時(shí)觸發(fā),用于判斷交換機(jī)是否成功收到消息
* @param correlationData 相關(guān)配置信息
* @param ack exchange 交換機(jī),判斷交換機(jī)是否成功收到消息 true 表示交換機(jī)收到
* @param cause 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause));
log.info("correlationData -->"+correlationData.toString());
if(ack){
// 交換機(jī)接收到
log.info("---- confirm ----ack==true cause="+cause);
}else{
// 沒有接收到
log.info("---- confirm ----ack==false cause="+cause);
}
}
}到此這篇關(guān)于RabbitMq消息防丟失功能實(shí)現(xiàn)方式講解的文章就介紹到這了,更多相關(guān)RabbitMq消息防丟失內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java基于elasticsearch實(shí)現(xiàn)集群管理
這篇文章主要介紹了java基于elasticsearch實(shí)現(xiàn)集群管理,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
SpringCloud Gateway 利用 Mysql 實(shí)現(xiàn)動(dòng)態(tài)路由的方法
這篇文章主要介紹了SpringCloud Gateway 利用 Mysql 實(shí)現(xiàn)動(dòng)態(tài)路由的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02
解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題
這篇文章主要介紹了解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。2023-07-07
Java之InputStreamReader類的實(shí)現(xiàn)
這篇文章主要介紹了Java之InputStreamReader類的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明
這篇文章主要介紹了BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08
Java 無符號(hào)右移與右移運(yùn)算符的使用介紹
這篇文章主要介紹了Java 無符號(hào)右移與右移運(yùn)算符的使用介紹,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06

