欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)

 更新時(shí)間:2022年05月26日 08:38:53   作者:IT利刃出鞘  
這篇文章主要介紹了SpringBoot整合RabbitMQ是如何實(shí)現(xiàn)消息確認(rèn)的,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

簡介

本文介紹SpringBoot整合RabbitMQ如何進(jìn)行消息的確認(rèn)。

生產(chǎn)者消息確認(rèn)

介紹

發(fā)送消息確認(rèn):用來確認(rèn)消息從 producer發(fā)送到 broker 然后broker 的 exchange 到 queue過程中,消息是否成功投遞。

如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會將消息寫入磁盤之后發(fā)出;如果是鏡像隊(duì)列,所有鏡像接受成功后發(fā)確認(rèn)消息。

流程

  • 如果消息沒有到達(dá)exchange,則confirm回調(diào),ack=false
  • 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
  • exchange到queue成功,則不回調(diào)return
  • exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不會回調(diào),這樣消息就丟了)

配置

application.yml

# 發(fā)送者開啟 confirm 確認(rèn)機(jī)制
spring.rabbitmq.publisher-confirms=true
# 發(fā)送者開啟 return 確認(rèn)機(jī)制
spring.rabbitmq.publisher-returns=true

ConfirmCallback

ConfirmCallback:消息只要被 RabbitMQ broker 接收到就會觸發(fā)confirm方法。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>發(fā)送到broker失敗\r\n" + 
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        } else {
            log.info("confirm==>發(fā)送到broker成功\r\n" + 
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        }
    }
}

correlationData:對象內(nèi)部有id (消息的唯一性)和Message。

(若ack為false,則Message不為null,可將Message數(shù)據(jù) 重新投遞;若ack是true,則correlationData為null)

ack:消息投遞到exchange 的狀態(tài),true表示成功。

cause:表示投遞失敗的原因。 (若ack為false,則cause不為null;若ack是true,則cause為null)

給每一條信息添加一個dataId,放在CorrelationData,這樣在RabbitConfirmCallback返回失敗時(shí)可以知道哪個消息失敗。

public void send(String dataId, String exchangeName, String rountingKey, String message){
    CorrelationData correlationData = new CorrelationData();
    correlationData.setId(dataId);
 
    rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData);
}
 
public String receive(String queueName){
    return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));
}

2.1版本開始,CorrelationData對象具有ListenableFuture,可用于獲取結(jié)果,而不是在rabbitTemplate上使用ConfirmCallback。

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

ReturnCallback

ReturnCallback:如果消息未能投遞到目標(biāo) queue 里將觸發(fā)returnedMessage方法。

若向 queue 投遞消息未成功,可記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補(bǔ)償?shù)炔僮鳌?/p>

注意:需要rabbitTemplate.setMandatory(true);

當(dāng)mandatory設(shè)置為true時(shí),若exchange根據(jù)自身類型和消息routingKey無法找到一個合適的queue存儲消息,那么broker會調(diào)用basic.return方法將消息返還給生產(chǎn)者。當(dāng)mandatory設(shè)置為false時(shí),出現(xiàn)上述情況broker會直接將消息丟棄。

代碼:

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                        message, replyCode, replyText, exchange, routingKey);
    }
}

message(消息體)、replyCode(響應(yīng)code)、replyText(響應(yīng)內(nèi)容)、exchange(交換機(jī))、routingKey(隊(duì)列)。 

注冊ConfirmCallback和ReturnCallback

整合后的寫法

package com.example.config;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import javax.annotation.PostConstruct;
 
@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        return rabbitTemplate;
    }
 
    // 下邊這樣寫也可以    
    // @Autowired
    // private RabbitTemplate rabbitTemplate;
    // @PostConstruct
    // public void init() {
    //     rabbitTemplate.setMandatory(true);
    //     rabbitTemplate.setReturnCallback(this);
    //     rabbitTemplate.setConfirmCallback(this);
    // }
 
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>發(fā)送到broker失敗\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        } else {
            log.info("confirm==>發(fā)送到broker成功\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        }
    }
 
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                        message, replyCode, replyText, exchange, routingKey);
    }
}

消費(fèi)者消息確認(rèn)

介紹

確認(rèn)方式簡介詳述
auto(默認(rèn))根據(jù)消息消費(fèi)的情況,智能判定若消費(fèi)者拋出異常,則mq不會收到確認(rèn)消息,mq會一直此消息發(fā)出去
若消費(fèi)者沒有拋出異常,則mq會收到確認(rèn)消息,mq不會再次將此消息發(fā)出去。
若消費(fèi)者在消費(fèi)時(shí)所在服務(wù)掛了,mq不會再次將此消息發(fā)出去。
nonemq發(fā)出消息后直接確認(rèn)消息 
manual消費(fèi)端手動確認(rèn)消息消費(fèi)者調(diào)用 ack、nack、reject 幾種方法進(jìn)行確認(rèn),可以在業(yè)務(wù)失敗后進(jìn)行一些操作,如果消息未被 ACK 則消息還會存在于MQ,mq會一直將此消息發(fā)出去。
如果某個服務(wù)忘記 ACK 了,則 RabbitMQ 不會再發(fā)送數(shù)據(jù)給它,因?yàn)?RabbitMQ 認(rèn)為該服務(wù)的處理能力有限。

只要消息沒有被消費(fèi)者確認(rèn)(包括沒有自動確認(rèn)),會導(dǎo)致消息一直被失敗消費(fèi),死循環(huán)導(dǎo)致消耗大量資源。正確的處理方式是:發(fā)生異常,將消息記錄到db,再通過補(bǔ)償機(jī)制來補(bǔ)償消息,或者記錄消息的重復(fù)次數(shù),進(jìn)行重試,超過幾次后再放到db中。

消息確認(rèn)三種方式配置方法

spring.rabbitmq.listener.simple.acknowledge-mode=manual

spring.rabbitmq.listener.direct.acknowledge-mode=manual

手動確認(rèn)三種方式

basicAck,basicNack,basicReject

basicAck

含義

表示成功確認(rèn),使用此回執(zhí)方法后,消息會被RabbitMQ broker 刪除。

函數(shù)原型

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag

  • 消息投遞序號
  • 每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認(rèn)模式下,我們可以對指定deliveryTag的消息進(jìn)行ack、nack、reject等操作。

multiple

  • 是否批量確認(rèn)
  • 值為 true 則會一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。

示例: 假設(shè)我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時(shí)deliveryTag為8,multiple設(shè)置為 true,會將5、6、7、8的消息全部進(jìn)行確認(rèn)。

實(shí)例

@RabbitHandler
public void process(String content, Channel channel, Message message){
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

basicNack

含義

表示失敗確認(rèn),一般在消費(fèi)消息業(yè)務(wù)異常時(shí)用到此方法,可以將消息重新投遞入隊(duì)列。

函數(shù)原型

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:表示消息投遞序號。
  • multiple:是否批量確認(rèn)。
  • requeue:值為 true 消息將重新入隊(duì)列。

basicReject

含義

拒絕消息,與basicNack區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。

函數(shù)原型

void basicReject(long deliveryTag, boolean requeue)

  • deliveryTag:表示消息投遞序號。
  • requeue:值為 true 消息將重新入隊(duì)列。

以上就是詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ消息確認(rèn)的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringBoot開發(fā)技巧之使用AOP記錄日志示例解析

    SpringBoot開發(fā)技巧之使用AOP記錄日志示例解析

    這篇文章主要為大家介紹了SpringBoot開發(fā)技巧之如何利用AOP記錄日志的示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2021-10-10
  • java實(shí)用型-高并發(fā)下RestTemplate的正確使用說明

    java實(shí)用型-高并發(fā)下RestTemplate的正確使用說明

    這篇文章主要介紹了java實(shí)用型-高并發(fā)下RestTemplate的正確使用說明,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • springboot整合mybatis實(shí)現(xiàn)簡單的一對多級聯(lián)查詢功能

    springboot整合mybatis實(shí)現(xiàn)簡單的一對多級聯(lián)查詢功能

    這篇文章主要介紹了springboot整合mybatis實(shí)現(xiàn)簡單的一對多級聯(lián)查詢功能,分步驟通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-08-08
  • Java批量操作文件系統(tǒng)的實(shí)現(xiàn)示例

    Java批量操作文件系統(tǒng)的實(shí)現(xiàn)示例

    文件上傳和下載是java web中常見的操作,本文主要介紹了Java批量操作文件系統(tǒng)的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-03-03
  • 用java實(shí)現(xiàn)掃雷游戲

    用java實(shí)現(xiàn)掃雷游戲

    這篇文章主要為大家詳細(xì)介紹了用java實(shí)現(xiàn)掃雷游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • Java List按照某字段去重的使用示例

    Java List按照某字段去重的使用示例

    在Java開發(fā)中,我們經(jīng)常會面臨對List中對象屬性去重的需求,本文主要介紹了Java List按照某字段去重的使用示例,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-12-12
  • mybatis中的if?test判斷入?yún)⒌闹祮栴}

    mybatis中的if?test判斷入?yún)⒌闹祮栴}

    這篇文章主要介紹了mybatis中的if?test判斷入?yún)⒌闹祮栴},具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • 從dubbo源碼分析qos-server端口沖突問題及解決

    從dubbo源碼分析qos-server端口沖突問題及解決

    這篇文章主要介紹了從dubbo源碼分析qos-server端口沖突問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • SpringBoot服務(wù)訪問路徑動態(tài)處理方式

    SpringBoot服務(wù)訪問路徑動態(tài)處理方式

    這篇文章主要介紹了SpringBoot服務(wù)訪問路徑動態(tài)處理方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Java中獲取當(dāng)前路徑的幾種方法總結(jié)

    Java中獲取當(dāng)前路徑的幾種方法總結(jié)

    這篇文章主要介紹了Java中獲取當(dāng)前路徑的幾種方法總結(jié)的相關(guān)資料,需要的朋友可以參考下
    2017-02-02

最新評論