欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMq消息防丟失功能實(shí)現(xiàn)方式講解

 更新時間:2023年01月20日 15:09:50   作者:_BugMan  
這篇文章主要介紹了RabbitMq消息防丟失功能實(shí)現(xiàn),RabbitMQ中,消息丟失可以簡單的分為兩種:客戶端丟失和服務(wù)端丟失。針對這兩種消息丟失,RabbitMQ都給出了相應(yīng)的解決方案

1.概述

1.1.數(shù)據(jù)丟失的原因

在消息中有三種可能性造成數(shù)據(jù)丟失:

  • 消費(fèi)者消費(fèi)消息失敗
  • 生產(chǎn)者生產(chǎn)消息失敗
  • MQ數(shù)據(jù)丟失

消費(fèi)者消費(fèi)消息失?。?/p>

RabbitMq存在應(yīng)答機(jī)制,默認(rèn)為自動應(yīng)答,MQ向消費(fèi)者推送一條消息,消費(fèi)者收到這條消息后會返回一個ack(應(yīng)答)給MQ,MQ收到應(yīng)答后會刪除這條消息。

自動應(yīng)答存在一個問題,就是消費(fèi)者收到消息后立馬就會給MQ返回ack,如果消費(fèi)者返回完ack但還沒來的及真正處理這條消息時,消費(fèi)者斷電宕機(jī)了,那么這條消息就丟失了。

這就是由于消費(fèi)者消費(fèi)消息失敗造成的數(shù)據(jù)丟失。

生產(chǎn)者生產(chǎn)數(shù)據(jù)失?。?/p>

生產(chǎn)者向MQ推送了一條消息,但是由于由于諸如網(wǎng)絡(luò)故障等原因mq并沒有收到該條消息,這樣就造成了這條消息的丟失。

MQ數(shù)據(jù)丟失:

MQ的數(shù)據(jù)是存在內(nèi)存中的,諸如斷電等原因可能會造成數(shù)據(jù)的丟失。

1.2.如何防止數(shù)據(jù)丟失

解決以上列舉的數(shù)據(jù)丟失問題的辦法有三種:

  • 手動應(yīng)答
  • 消息確認(rèn)機(jī)制
  • 持久化

手動應(yīng)答:

RabbitMQ默認(rèn)是自動應(yīng)答,消費(fèi)者收到消息后就會自動返回ack給MQ,可以將應(yīng)答模式改為手動應(yīng)答,在消費(fèi)者一側(cè)消息的消費(fèi)動作完成后手動來返回ack給MQ,用來解決“消費(fèi)者消費(fèi)消息失敗”問題。

消息確認(rèn)機(jī)制:

當(dāng)消息隊(duì)列收到消息后,告知生產(chǎn)者,讓生產(chǎn)者感知到自己生產(chǎn)的消息,消息隊(duì)列已經(jīng)接收到,用來解決“生產(chǎn)者生產(chǎn)消息失敗”問題。消息確認(rèn)機(jī)制有兩種實(shí)現(xiàn)方式:

  • AMQP事務(wù)
  • confirm

持久化:

消息隊(duì)列的消息持久化到磁盤上,用來解決“MQ數(shù)據(jù)丟失”問題。

2.手動應(yīng)答

手動應(yīng)答是通過設(shè)置channel來實(shí)現(xiàn)的,以下為一個完整代碼示例。

配置類:

@Configuration
public class config {
    @Bean
    public Queue queue(){
        return new Queue("queue_01",false);
    }
}

生產(chǎn)者:

@SpringBootTest(classes = Main.class)
public class Producer {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void producerMsg(){
        rabbitTemplate.convertAndSend("queue_01","hello_world");
    }
}

消費(fèi)者:

@Component
@Slf4j
public class Consumer {
    @RabbitListener(queues = {"queue_01"})
    public void consumerMsg(String msg, Message message,Channel channel){
        try {
            log.info("消費(fèi)者消費(fèi)消息: "+msg);
            /**
             * 沒有異常就確認(rèn)消息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
             * multiple:為true的話就是批量確認(rèn)
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            /**
             * 有異常就拒收消息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * requeue:true為將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者;
             *         false將消息丟棄
             */
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (Exception ex) {
                log.error(ex.getMessage());
            }
        }
    }
}

3.消息確認(rèn)機(jī)制

AQMP事務(wù)、confirm其實(shí)都是基于channel的。

3.1.AMQP事務(wù)

AMQP事務(wù)和數(shù)據(jù)庫事務(wù)類似,定義一組對MQ的操作,統(tǒng)一提交,成功則全部一起執(zhí)行,失敗則全部回滾。AMQP事務(wù)在spring boot中的使用很簡單,和數(shù)據(jù)庫的事務(wù)一樣,一個注解就可以搞定。

@GetMapping("/direct/wx/transactional")
@Transactional(rollbackFor = Exception.class)
public String sendDirectMessageTransactional() {
  rabbitTemplate.convertAndSend("direct_exchange", "wx","hello world!");
  log.info("開啟事務(wù)消息機(jī)制");
    try {
           Thread.sleep(5000);
       } catch (Exception e) {
            e.printStackTrace();
       }
      return "ok";
}

3.2.confirm

confirm是基于channel的,一旦channel進(jìn)入confirm模式,所有在該channel上發(fā)布的消息都會被指派一個唯一的ID(從1開始),消息被投遞道匹配隊(duì)列后broker會發(fā)送一個確認(rèn)消息給生產(chǎn)者。如果消息和隊(duì)列是可持久化的(durable為true),那么確認(rèn)消息會在消息被寫入磁盤后發(fā)出。

confirm最大的好處在于異步,生產(chǎn)者在等待上一條消息的確認(rèn)消息的時候可以繼續(xù)往下發(fā)送。

confirm在spring boot中的使用很簡單,在配置文件中開啟即可,并且支持自定義回調(diào)函數(shù):

配置文件:

spring.rabbitmq.publisher-confirms: true

spring.rabbitmq.publisher-returns: true

生產(chǎn)者:

@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange,String routingKey,Object msg) {
        // 設(shè)置交換機(jī)處理失敗消息的模式     true 表示消息由交換機(jī) 到達(dá)不了隊(duì)列時,會將消息重新返回給生產(chǎn)者
        // 如果不設(shè)置這個指令,則交換機(jī)向隊(duì)列推送消息失敗后,不會觸發(fā) setReturnCallback
        rabbitTemplate.setMandatory(true);
        //消息消費(fèi)者確認(rèn)收到消息后,手動ack回執(zhí)
        rabbitTemplate.setConfirmCallback(this);
        // 暫時關(guān)閉 return 配置
        //rabbitTemplate.setReturnCallback(this);
        //發(fā)送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    }
    /**
     * 交換機(jī)并未將數(shù)據(jù)丟入指定的隊(duì)列中時,觸發(fā)
     *  channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
     *  參數(shù)三:true  表示如果消息無法正常投遞,則return給生產(chǎn)者 ;false 表示直接丟棄
     * @param message   消息對象
     * @param replyCode 錯誤碼
     * @param replyText 錯誤信息
     * @param exchange 交換機(jī)
     * @param routingKey 路由鍵
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
    }
    /**
     * 消息生產(chǎn)者發(fā)送消息至交換機(jī)時觸發(fā),用于判斷交換機(jī)是否成功收到消息
     * @param correlationData  相關(guān)配置信息
     * @param ack exchange 交換機(jī),判斷交換機(jī)是否成功收到消息    true 表示交換機(jī)收到
     * @param cause  失敗原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
        log.info("correlationData -->"+correlationData.toString());
        if(ack){
            // 交換機(jī)接收到
            log.info("---- confirm ----ack==true  cause="+cause);
        }else{
            // 沒有接收到
            log.info("---- confirm ----ack==false  cause="+cause);
        }
    }
}

到此這篇關(guān)于RabbitMq消息防丟失功能實(shí)現(xiàn)方式講解的文章就介紹到這了,更多相關(guān)RabbitMq消息防丟失內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot源碼剖析之屬性文件加載原理

    SpringBoot源碼剖析之屬性文件加載原理

    這篇文章主要給大家介紹了關(guān)于SpringBoot源碼剖析之屬性文件加載原理的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-02-02
  • Java基于elasticsearch實(shí)現(xiàn)集群管理

    Java基于elasticsearch實(shí)現(xiàn)集群管理

    這篇文章主要介紹了java基于elasticsearch實(shí)現(xiàn)集群管理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • SpringCloud Gateway 利用 Mysql 實(shí)現(xiàn)動態(tài)路由的方法

    SpringCloud Gateway 利用 Mysql 實(shí)現(xiàn)動態(tài)路由的方法

    這篇文章主要介紹了SpringCloud Gateway 利用 Mysql 實(shí)現(xiàn)動態(tài)路由的方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-02-02
  • Java知識點(diǎn)歸納總結(jié)

    Java知識點(diǎn)歸納總結(jié)

    本篇文章對Java的一些知識點(diǎn)進(jìn)行了歸納總結(jié)分析。需要的朋友參考下
    2013-05-05
  • 解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題

    解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題

    這篇文章主要介紹了解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題,具有很好的參考價(jià)值,希望對大家有所幫助。
    2023-07-07
  • Java中BEAN與EJB的區(qū)別淺析

    Java中BEAN與EJB的區(qū)別淺析

    這篇文章主要介紹了Java中BEAN與EJB的區(qū)別淺析,本文總結(jié)了它們之間的不同之處,需要的朋友可以參考下
    2015-03-03
  • MyBatis的緩存解析

    MyBatis的緩存解析

    這篇文章主要介紹了MyBatis的緩存解析,一級緩存是SqlSession級別的,通過同一個SqlSession查詢的數(shù)據(jù)會緩存,下次查詢相同的數(shù)據(jù)就會從緩存中直接獲取,不會從數(shù)據(jù)重新訪問,前提必須是同一個SqlSession對象,并且查詢的數(shù)據(jù)相同,需要的朋友可以參考下
    2023-09-09
  • Java之InputStreamReader類的實(shí)現(xiàn)

    Java之InputStreamReader類的實(shí)現(xiàn)

    這篇文章主要介紹了Java之InputStreamReader類的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明

    BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明

    這篇文章主要介紹了BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • Java 無符號右移與右移運(yùn)算符的使用介紹

    Java 無符號右移與右移運(yùn)算符的使用介紹

    這篇文章主要介紹了Java 無符號右移與右移運(yùn)算符的使用介紹,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06

最新評論