欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ進(jìn)階之消息可靠性詳解

 更新時(shí)間:2023年08月10日 10:53:23   作者:程序員阿紅  
這篇文章主要介紹了RabbitMQ進(jìn)階之消息可靠性詳解,abbitmq消息的投遞過程中,怎么確保消息能不丟失,這是一個(gè)很重要的問題,哪怕我們做了Rabbitmq持久化,也不能保證我們的業(yè)務(wù)消息不會(huì)被丟失,需要的朋友可以參考下

消息的可靠性

Rabbitmq消息的投遞過程中,怎么確保消息能不丟失,這是一個(gè)很重要的問題。哪怕我們做了Rabbitmq持久化,也不能保證我們的業(yè)務(wù)消息不會(huì)被丟失。

我們可以從消息的收發(fā)過程中來分析,消息首先要從生產(chǎn)者producer發(fā)送到broker,再?gòu)腷roker把消息發(fā)送給消費(fèi)者consumer。

image-20230321151738456

所以我們總的可以從發(fā)送方(生產(chǎn)者)確認(rèn)和接收方(消費(fèi)者)確認(rèn)來保證消息的可靠性。

image-20230321150208466

異常捕獲機(jī)制

先執(zhí)行業(yè)務(wù)操作,業(yè)務(wù)操作成功后執(zhí)行行消息發(fā)送,消息發(fā)送過程通過try catch 方式捕獲異常, 在異常處理理的代碼塊中執(zhí)行回滾業(yè)務(wù)操作或者執(zhí)行重發(fā)操作等。

這是一種最大努力確保的方式, 并無法保證100%絕對(duì)可靠,因?yàn)檫@里沒有異常并不代表消息就一定投遞成功。

image-20230321152637400

另外,可以通過spring.rabbitmq.template.retry.enabled=true 配置開啟發(fā)送端的重試。

AMQP/RabbitMQ的事務(wù)機(jī)制

沒有捕獲到異常并不能代表消息就一定投遞成功了。 一直到事務(wù)提交后都沒有異常,確實(shí)就說明消息是投遞成功了。

但是,這種方式在性能方面的開銷 比較大,一般也不推薦使用。

  • 事務(wù)實(shí)現(xiàn)
channel.txSelect(): 將當(dāng)前信道設(shè)置成事務(wù)模式
channel.txCommit(): 用于提交事務(wù)
channel.txRollback(): 用于回滾事務(wù)

image-20230321152859934

發(fā)送端確認(rèn)機(jī)制

RabbitMQ后來引入了一種輕量量級(jí)的方式,叫發(fā)送方確認(rèn)(publisher confirm)機(jī)制。生產(chǎn)者將信 道設(shè)置成confirm(確認(rèn))模式,一旦信道進(jìn)入confirm 模式,所有在該信道上?面發(fā)布的消息都會(huì)被指派 一個(gè)唯一的ID(從1 開始),一旦消息被投遞到所有匹配的隊(duì)列之后(如果消息和隊(duì)列是持久化的,那么 確認(rèn)消息會(huì)在消息持久化后發(fā)出),RabbitMQ 就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一 ID),這樣生產(chǎn)者就知道消息已經(jīng)正確送達(dá)了。

image-20230321153131995

RabbitMQ 回傳給生產(chǎn)者的確認(rèn)消息中的deliveryTag 字段包含了確認(rèn)消息的序號(hào),另外,通過設(shè)置channel.basicAck方法中的multiple參數(shù),表示到這個(gè)序號(hào)之前的所有消息是否都已經(jīng)得到了處理了。生產(chǎn)者投遞消息后并不需要一直阻塞著,可以繼續(xù)投遞下一條消息并通過回調(diào)方式處理理ACK響應(yīng)。

如果 RabbitMQ 因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失等異常情況發(fā)生,就會(huì)響應(yīng)一條nack(Basic.Nack)命令,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理理該 nack 命令。

package confirm;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherConfirmsProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
        try {
        // 發(fā)送消息
        for (int i = 1 ; i < 10000 ; i++){
            channel.basicPublish("ex.pc", "key.pc", null, "hello world".getBytes());
        }
            // 同步的方式等待RabbitMQ的確認(rèn)消息
            channel.waitForConfirmsOrDie(5000);
            System.out.println("發(fā)送的消息已經(jīng)得到確認(rèn)");
        } catch (IOException ex) {
            System.out.println("消息被拒收");
        } catch (IllegalStateException ex) {
            System.out.println("發(fā)送消息的通道不是PublisherConfirms通道");
        } catch (TimeoutException ex) {
            System.out.println("等待消息確認(rèn)超時(shí)");
        }
        channel.close();
        connection.close();
    }
}

waitForConfirm方法有個(gè)重載的,可以自定義timeout超時(shí)時(shí)間,超時(shí)后會(huì)拋TimeoutException。類似的有幾個(gè)waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后該方法會(huì)拋出java.io.IOException。

需要根據(jù)異常類型來做區(qū)別處理理, TimeoutException超時(shí)是屬于第三狀態(tài)(無法確定成功還是失?。?,而返回Basic.Nack拋出IOException這種是明確的失敗。上面的代碼主要只是演示confirm機(jī)制,實(shí)際上還是同步阻塞模式的,性能并不不是太好。

實(shí)際上,我們也可以通過“批處理理”的方式來改善整體的性能(即批量量發(fā)送消息后僅調(diào)用一次 waitForConfirms方法)。正常情況下這種批量處理的方式效率會(huì)高很多,但是如果發(fā)生了超時(shí)或者nack(失?。┖竽蔷托枰苛恐匕l(fā)消息或者通知上游業(yè)務(wù)批量回滾(因?yàn)槲覀冎恢肋@個(gè)批次中有消息沒投遞成功,而并不知道具體是那條消息投遞失敗了,所以很難針對(duì)性處理),如此看來,批量重發(fā)消息肯定會(huì)造成部分消息重復(fù)。

另外,我們可以通過異步回調(diào)的方式來處理Broker的響應(yīng)。addConfirmListener 方法可以添加ConfirmListener 這個(gè)回調(diào)接口,這個(gè) ConfirmListener 接口包含兩個(gè)方法:handleAck 和handleNack,分別用來處理 RabbitMQ 回傳的 Basic.Ack 和 Basic.Nack。

package confirm;
/**
 * 創(chuàng)建者: 魏紅
 * 創(chuàng)建時(shí)間: 2023-02-28
 * 描述:
 */
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;
public class PublisherConfirmsProducer2 {
    public static void main(String[] args) throws Exception {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
        String message = "hello-";
        // 批處理的大小
        int batchSize = 10;
        // 用于對(duì)需要等待確認(rèn)消息的計(jì)數(shù)
        int outstrandingConfirms = 0;
        for (int i = 0; i < 10000; i++) {
            channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
            outstrandingConfirms++;
            if (outstrandingConfirms == batchSize) {
                // 此時(shí)已經(jīng)有一個(gè)批次的消息需要同步等待broker的確認(rèn)消息
                // 同步等待
                channel.waitForConfirmsOrDie(5000);
                System.out.println("消息已經(jīng)被確認(rèn)了");
                outstrandingConfirms = 0;
            }
        }
        if (outstrandingConfirms > 0) {
            channel.waitForConfirmsOrDie(5000);
            System.out.println("剩余消息已經(jīng)被確認(rèn)了");
        }
        channel.close();
        connection.close();
    }
}

還可以使用異步方法:

package confirm;
/**
 * 創(chuàng)建者: 魏紅
 * 創(chuàng)建時(shí)間: 2023-02-28
 * 描述:
 */
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import javax.management.loading.MLet;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class PublisherConfirmsProducer3 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
//        ConfirmCallback clearOutstandingConfirms = new ConfirmCallback() {
//            @Override
//            public void handle(long deliveryTag, boolean multiple) throws IOException {
//                if (multiple) {
//                    System.out.println("編號(hào)小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認(rèn)了");
//                } else {
//                    System.out.println("編號(hào)為:" + deliveryTag + " 的消息被確認(rèn)");
//                }
//            }
//        };
        ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
        ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> {
            if (multiple) {
                System.out.println("編號(hào)小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認(rèn)了");
                final ConcurrentNavigableMap<Long, String> headMap
                        = outstandingConfirms.headMap(deliveryTag, true);
                // 清空outstandingConfirms中已經(jīng)被確認(rèn)的消息信息
                headMap.clear();
            } else {
                // 移除已經(jīng)被確認(rèn)的消息
                outstandingConfirms.remove(deliveryTag);
                System.out.println("編號(hào)為:" + deliveryTag + " 的消息被確認(rèn)");
            }
        };
        ConfirmCallback confirmCallback = (deliveryTag, multiple) -> {
            if (multiple) {
                // 將沒有確認(rèn)的消息記錄到一個(gè)集合中
                // 此處省略實(shí)現(xiàn)
                System.out.println("消息編號(hào)小于等于:" + deliveryTag + " 的消息 不確認(rèn)");
            } else {
                System.out.println("編號(hào)為:" + deliveryTag + " 的消息不確認(rèn)");
            }
        };
        // 設(shè)置channel的監(jiān)聽器,處理確認(rèn)的消息和不確認(rèn)的消息
        channel.addConfirmListener(clearOutstandingConfirms, confirmCallback);
        String message = "hello-";
        for (int i = 0; i < 500000; i++) {
            // 獲取下一條即將發(fā)送的消息的消息ID
            final long nextPublishSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
            System.out.println("編號(hào)為:" + nextPublishSeqNo + " 的消息已經(jīng)發(fā)送成功,尚未確認(rèn)");
            outstandingConfirms.put(nextPublishSeqNo, (message + i));
        }
        // 等待消息被確認(rèn)
        Thread.sleep(10000);
        channel.close();
        connection.close();
    }
}

持久化存儲(chǔ)機(jī)制

持久化是提高RabbitMQ可靠性的基礎(chǔ),否則當(dāng)RabbitMQ遇到異常時(shí)(如:重啟、斷電、停機(jī)等)數(shù)據(jù)將會(huì)丟失。主要從以下幾個(gè)方面來保障消息的持久性:

  • Exchange的持久化。通過定義時(shí)設(shè)置durable 參數(shù)為ture來保證Exchange相關(guān)的元數(shù)據(jù)不不丟失。
  • Queue的持久化。也是通過定義時(shí)設(shè)置durable 參數(shù)為ture來保證Queue相關(guān)的元數(shù)據(jù)不不丟失。
  • 消息的持久化。通過將消息的投遞模式 (BasicProperties 中的 deliveryMode 屬性)設(shè)置為 2即可實(shí)現(xiàn)消息的持久化,保證消息自身不丟失。

image-20230321153556625

接收端確認(rèn)機(jī)制

如何保證消息被消費(fèi)者成功消費(fèi)?

前面我們講了生產(chǎn)者發(fā)送確認(rèn)機(jī)制和消息的持久化存儲(chǔ)機(jī)制,然而這依然無法完全保證整個(gè)過程的 可靠性,因?yàn)槿绻⒈幌M(fèi)過程中業(yè)務(wù)處理失敗了但是消息卻已經(jīng)出列了(被標(biāo)記為已消費(fèi)了),我 們又沒有任何重試,那結(jié)果跟消息丟失沒什么分別。

RabbitMQ在消費(fèi)端會(huì)有Ack機(jī)制,即消費(fèi)端消費(fèi)消息后需要發(fā)送Ack確認(rèn)報(bào)文給Broker端,告知自 己是否已消費(fèi)完成,否則可能會(huì)一直重發(fā)消息直到消息過期(AUTO模式)。這也是我們之前一直在講的“最終一致性”、“可恢復(fù)性” 的基礎(chǔ)。

一般而言,我們有如下處理手段:

  • 采用NONE模式,消費(fèi)的過程中自行捕獲異常,引發(fā)異常后直接記錄日志并落到異常恢復(fù)表,再通過后臺(tái)定時(shí)任務(wù)掃描異?;謴?fù)表嘗試做重試動(dòng)作。如果業(yè)務(wù)不自行處理則有丟失數(shù)據(jù)的風(fēng)險(xiǎn)
  • 采用AUTO(自動(dòng)Ack)模式,不主動(dòng)捕獲異常,當(dāng)消費(fèi)過程中出現(xiàn)異常時(shí)會(huì)將消息放回Queue中,然后消息會(huì)被重新分配到其他消費(fèi)者節(jié)點(diǎn)(如果沒有則還是選擇當(dāng)前節(jié)點(diǎn))重新被消費(fèi),默認(rèn)會(huì)一直重發(fā)消息并直到消費(fèi)完成返回Ack或者一直到過期
  • 采用MANUAL(手動(dòng)Ack)模式,消費(fèi)者自行控制流程并手動(dòng)調(diào)用channel相關(guān)的方法返回Ack
package workmode;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/**
* NONE模式,則只要收到消息后就立即確認(rèn)(消息出列,標(biāo)記已消費(fèi)),有丟失數(shù)據(jù)的風(fēng)險(xiǎn)
* AUTO模式,看情況確認(rèn),如果此時(shí)消費(fèi)者拋出異常則消息會(huì)返回到隊(duì)列中
* MANUAL模式,需要顯式的調(diào)用當(dāng)前channel的basicAck方法
 */
public class Recer2 {
    public static void main(String[] args) throws  Exception {
        // 1.獲得連接
        Connection connection = ConnectionUtil.getConnection();
        // 2.獲得通道(信道)
        final Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue",false,false,false,null);
        // 3.從信道中獲得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override //交付處理(收件人信息,包裹上的快遞標(biāo)簽,協(xié)議的配置,消息)
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
//                System.out.println("【顧客2】吃掉 " + s+" ! 總共吃【"+i+++"】串!");
                System.out.println("【消費(fèi)者2】得到 " + s);
                // 模擬網(wǎng)絡(luò)延遲
                try{
                    Thread.sleep(400);
                }catch (Exception e){
                }
                // 手動(dòng)確認(rèn)(收件人信息,是否同時(shí)確認(rèn)多個(gè)消息)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 4.監(jiān)聽隊(duì)列 false:手動(dòng)消息確認(rèn)
        channel.basicConsume("work_queue", false,consumer);
    }
}

本小節(jié)的內(nèi)容總結(jié)起來就如圖所示,本質(zhì)上就是“請(qǐng)求/應(yīng)答”確認(rèn)模式

image-20230321154156414

到此這篇關(guān)于RabbitMQ進(jìn)階之消息可靠性詳解的文章就介紹到這了,更多相關(guān)RabbitMQ消息可靠性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Kotlin教程之基本數(shù)據(jù)類型

    Kotlin教程之基本數(shù)據(jù)類型

    這篇文章主要介紹了Kotlin教程之基本數(shù)據(jù)類型的學(xué)習(xí)的相關(guān)資料,需要的朋友可以參考下
    2017-05-05
  • java 數(shù)據(jù)類型有哪些取值范圍多少

    java 數(shù)據(jù)類型有哪些取值范圍多少

    這篇文章主要介紹了java 數(shù)據(jù)類型有哪些取值范圍多少的相關(guān)資料,網(wǎng)上關(guān)于java 數(shù)據(jù)類型的資料有很多,不夠全面,這里就整理下,需要的朋友可以參考下
    2017-01-01
  • Java Set接口及常用實(shí)現(xiàn)類總結(jié)

    Java Set接口及常用實(shí)現(xiàn)類總結(jié)

    Collection的另一個(gè)子接口就是Set,他并沒有我們List常用,并且自身也沒有一些額外的方法,全是繼承自Collection中的,因此我們還是簡(jiǎn)單總結(jié)一下,包括他的常用實(shí)現(xiàn)類HashSet、LinkedHashSet、TreeSet的總結(jié)
    2023-01-01
  • Spring:如何使用枚舉參數(shù)

    Spring:如何使用枚舉參數(shù)

    這篇文章主要介紹了springboot枚舉類型傳遞的步驟,幫助大家更好的理解和學(xué)習(xí)使用springboot,感興趣的朋友可以了解下,希望能給你帶來幫助
    2021-08-08
  • 一次排查@CacheEvict注解失效的經(jīng)歷及解決

    一次排查@CacheEvict注解失效的經(jīng)歷及解決

    這篇文章主要介紹了一次排查@CacheEvict注解失效的經(jīng)歷及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Maven多模塊及version修改的實(shí)現(xiàn)方法

    Maven多模塊及version修改的實(shí)現(xiàn)方法

    這篇文章主要介紹了Maven多模塊及version修改的實(shí)現(xiàn)方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2019-06-06
  • Spring Boot 2 實(shí)戰(zhàn):自定義啟動(dòng)運(yùn)行邏輯實(shí)例詳解

    Spring Boot 2 實(shí)戰(zhàn):自定義啟動(dòng)運(yùn)行邏輯實(shí)例詳解

    這篇文章主要介紹了Spring Boot 2 實(shí)戰(zhàn):自定義啟動(dòng)運(yùn)行邏輯,結(jié)合實(shí)例形式詳細(xì)分析了Spring Boot 2自定義啟動(dòng)運(yùn)行邏輯詳細(xì)操作技巧與注意事項(xiàng),需要的朋友可以參考下
    2020-05-05
  • 輕松掌握java責(zé)任鏈模式

    輕松掌握java責(zé)任鏈模式

    這篇文章主要幫助大家輕松掌握java責(zé)任鏈模式,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-09-09
  • java多線程之wait(),notify(),notifyAll()的詳解分析

    java多線程之wait(),notify(),notifyAll()的詳解分析

    本篇文章是對(duì)java多線程 wait(),notify(),notifyAll()進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下
    2013-06-06
  • JAVA集成Freemarker生成靜態(tài)html過程解析

    JAVA集成Freemarker生成靜態(tài)html過程解析

    這篇文章主要介紹了JAVA集成Freemarker生成靜態(tài)html過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06

最新評(píng)論