欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java中RabbitMQ高級應(yīng)用

 更新時間:2022年05月03日 09:01:08   作者:beordie  
本文主要介紹了java中RabbitMQ高級應(yīng)用,中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

1、消息可靠性投遞

 在使用 RabbitMQ 的時候,生產(chǎn)者在進行消息投遞的時候如果想知道消息是否成功的投遞到對應(yīng)的交換機和隊列中,有兩種方式可以用來控制消息投遞的可靠性模式 。

 由上圖的整個消息的投遞過程來看,生產(chǎn)者的消息進入到中間件中會首先到達交換機,然后再從交換機傳遞到隊列中去,也就是分為兩步走戰(zhàn)略。那么消息的丟失情況也就是會出現(xiàn)在這兩個階段中,RabbitMQ 貼心的為我們提供了針對于這兩個部分的可靠新傳遞模式:

  • confirm 模式。
  • return 模式

 利用這兩個回調(diào)模式來確保消息的傳遞可靠。

 1.1、確認模式

 消息從生產(chǎn)者到交換機之間傳遞會返回一個 confirmCallback 的回調(diào)??梢灾苯釉?rabbitTemplate 實例中進行確認邏輯的設(shè)置。如果是使用 XML 配置的話需要在工廠配置開啟 publisher-confirms="true",YAML 的配置就直接 publisher-confirm-type: correlated,他默認是 NONE ,需要手動開啟。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println();
                if (!b) {
                    //	消息重發(fā)之類的處理
                    System.out.println(s);
                } else {
                    System.out.println("交換機成功接收消息");
                }
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

 上面的確認是由一個 confirm 的函數(shù)執(zhí)行的,里面攜帶了三個參數(shù),第一個是配置的相關(guān)信息,第二個表示交換機是否成功的接收到消息,第三個參數(shù)是指沒有成功接收消息的原因。

 1.2、退回模式

 從交換機到消息隊列投遞失敗會返回一個 returnCallback 。在工廠配置中開啟回退模式 publisher-returns="true" ,設(shè)置交換機處理消息失敗的模式(默認 false 直接將消息進行丟棄),添加退回處理的邏輯。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  重發(fā)邏輯處理
                System.out.println(message.getBody() + " 投遞消息隊列失敗");
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

returnedMessage 中攜帶五個參數(shù)、分別指的是消息對象、錯誤碼、錯誤信息、交換機、路由鍵。

 1.3、確認機制

 在消費者抓取消息隊列中的數(shù)據(jù)取消費之后會有一個確認機制進行消息的確認,防止因為抓取消息之后但沒有消費成功而導(dǎo)致的消息丟失。有三種確認方式:

  • 自動確認acknowledge="none"

  • 手動確認acknowledge="manual"

  • 根據(jù)異常情況確認acknowledge="auto"

 其中自動確認是指一旦消息被消費者抓取就自動默認成功,并將消息從消息隊列中進行移除,如果這個時候消費端消費出現(xiàn)問題,那么也會是默認消息消費成功,但是實際上是沒有消費成功的,也就是當前的消息丟失了。默認的情況就是自動確認機制。

 如果設(shè)置手動確認的方式,就需要在正常消費消息之后進行回調(diào)確認 channel.basicAck(),手動簽收。如果業(yè)務(wù)處理過程中發(fā)生了異常則調(diào)用 channel.basicNack() 重新發(fā)送消息。

 首先需要在隊列綁定時進行確認機制的配置,設(shè)置為手動簽收。

<!-- 綁定隊列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>

 生產(chǎn)者一端不用更改,只需要改變消費者的實現(xiàn)進行消息自動簽收就可以了,正常執(zhí)行業(yè)務(wù)則簽收消息,業(yè)務(wù)發(fā)生錯誤則選擇消息拒簽,消息重發(fā)或者丟棄。

public class ConsumerAck implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //  消息唯一ID
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            String msg = new String(message.getBody(), "utf-8");
            channel.basicAck(tag, true);
            System.out.println("接收消息: " + msg);
        } catch (Exception e) {
            System.out.println("接收消息異常");
            channel.basicNack(tag, true, true);
            e.printStackTrace();
        }
    }
}

 里面涉及三個簡單的簽收函數(shù),一是正確簽收的 basicAck ,二是單條拒簽的 basicReject ,三是批量拒簽的 basicNack

  • basicAck 第一個參數(shù)表示消息在通道中的唯一ID,只針對當前的 Channel;第二個參數(shù)表示是否批量同意,如果是 false 的話只會同意簽收當前ID的一條消息,將其從消息隊列中進行刪除,而如果是 true 的話將會把此ID之前的消息一起給同意簽收了。
  • basicReject 第一個參數(shù)依舊表示消息的唯一ID,第二個參數(shù)表示是否重新回隊發(fā)送,false 表示直接丟棄該條消息或者有死信隊列可以接收, true 則表示重新回隊進行消息發(fā)送,所有操作只針對當前的消息。
  • basicNack 比第二個多了一個參數(shù),也就是處于中間位置的布爾值,表示是否批量進行。

2、消費端限流

 在用戶請求和DB服務(wù)處理之間增加消息中間件的隔離,使得突發(fā)流量全部讓消息隊列來抗,降低服務(wù)端被沖垮的可能性。讓所有的請求都往隊列中存,消費端只需要勻速的取出消息進行消費,這樣就能保證運行效率,也不會因為后臺的阻塞而導(dǎo)致客戶端得不到正常的響應(yīng)(當然指的是一些不需要同步回顯的任務(wù))。

 只需要在消費者綁定消息隊列時指定取出消息的速率即可,需要使用手動簽收的方式,每進行一次的簽收才會從隊列中再取出下一條數(shù)據(jù)。

<!-- 綁定隊列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true"
                           acknowledge="manual" prefetch="1">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>

3、消息過期時間

 消息隊列提供了存儲在隊列中消息的過期時間,分為兩個方向的實現(xiàn),一個是針對于整個隊列中的所有消息,也就是隊列的過期時間,另一個是針對當前消息的過期時間,也就是針對于單條消息單獨設(shè)置。

 隊列的過期時間設(shè)置很簡單,只需要在創(chuàng)建隊列時進行過期時間的指定即可,也可以通過控制臺直接創(chuàng)建指定過期時間。一旦隊列過期時間到了,隊列中還未被消費的消息都將過期,進行隊列的過期處理。

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
</rabbit:queue>

 單條消息的過期時間需要在發(fā)送的時候進行單獨的指定,發(fā)送的時候指定配置的額外信息,配置的編寫由配置類完成。

 如果一條消息的過期時間到了,但是他此時處于隊列的中間,那么他將不會被處理,只有當之后處理到時候才會進行判斷是否過期。

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws
        AmqpException {
        //	設(shè)置 message 的過期時間
        message.getMessageProperties().setExpiration("5000");
        //	返回該消息
        return message;
    }
};
rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);

 如果說同時設(shè)置了消息的過期時間和隊列的過期時間,那么最終的過期時間由最短的時間進行決定,也就是說如果當前消息的過期時間沒到,但是整個隊列的過期時間到了,那么隊列中的所有消息也自然就過期了,執(zhí)行過期的處理策略。

4、死信隊列

 4.1、死信概念

死信隊列指的是死信交換機,當一條消息成為死信之后可以重新發(fā)送到另一個交換機進行處理,而進行處理的這個交換機就叫做死信交換機。

  • 消息成為死信消息有幾種情況

    隊列的消息長度達到限制

    消費者拒接消息的時候不把消息重新放入隊列中

    隊列存在消息過期設(shè)置,消息超時未被消費

    消息存在過期時間,在投遞給消費者時發(fā)現(xiàn)過期

 在創(chuàng)建隊列時可以在配置中指定相關(guān)的信息,例如死信交換機、隊列長度等等,之后的一系列工作就不由程序員進行操作了,MQ 會自己完成配置過的事件響應(yīng)。

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <!-- 死信交換機 -->
        <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/>
        <!-- 路由 -->
        <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/>
        <!-- 隊列過期時間 -->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        <!-- 隊列長度 -->
        <entry key="x-max-length" value-type="java.lang.Integer" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

 4.2、延遲隊列

 延遲隊列指的是消息在進入隊列后不會立即被消費,只有到達指定時間之后才會被消費,也就是需要有一個時間的判斷條件。

 消息隊列實際上是沒有提供對延遲隊列的實現(xiàn)的,但是可以通過 TTL + 死信隊列 的方式完成,設(shè)置一個隊列,不被任何的消費者所消費,所有的消息進入都會被保存在里面,設(shè)置隊列的過期時間,一旦隊列過期將所有的消息過渡到綁定的死信隊列中。

 再由具體的消費者來消費死信隊列中的消息,這樣就實現(xiàn)了延遲隊列的功能。

 例如實現(xiàn)一個下單超時支付取消訂單的功能:

到此這篇關(guān)于java中RabbitMQ高級應(yīng)用的文章就介紹到這了,更多相關(guān)java RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • spring使用RedisTemplate的操作類訪問Redis

    spring使用RedisTemplate的操作類訪問Redis

    本篇文章主要介紹了spring使用RedisTemplate的操作類訪問Redis,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-05-05
  • Java?Rabbitmq中四種集群架構(gòu)的區(qū)別詳解

    Java?Rabbitmq中四種集群架構(gòu)的區(qū)別詳解

    這篇文章主要為大家詳細介紹了Java?Rabbitmq中四種集群架構(gòu)的區(qū)別,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-02-02
  • java 如何將多種字符串格式 解析為Date格式

    java 如何將多種字符串格式 解析為Date格式

    這篇文章主要介紹了java 如何將多種字符串格式 解析為Date格式的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • java遍歷http請求request的所有參數(shù)實現(xiàn)方法

    java遍歷http請求request的所有參數(shù)實現(xiàn)方法

    下面小編就為大家?guī)硪黄猨ava遍歷http請求request的所有參數(shù)實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-09-09
  • java TreeUtil菜單遞歸工具類

    java TreeUtil菜單遞歸工具類

    這篇文章主要為大家詳細介紹了java TreeUtil菜單遞歸工具類,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-08-08
  • Java中的System.arraycopy()淺復(fù)制方法詳解

    Java中的System.arraycopy()淺復(fù)制方法詳解

    這篇文章主要介紹了Java中的System.arraycopy()淺復(fù)制方法詳解,Java數(shù)組的復(fù)制操作可以分為深度復(fù)制和淺度復(fù)制,簡單來說深度復(fù)制,可以將對象的值和對象的內(nèi)容復(fù)制;淺復(fù)制是指對對象引用的復(fù)制,需要的朋友可以參考下
    2023-11-11
  • Java注解中@Component和@Bean的區(qū)別

    Java注解中@Component和@Bean的區(qū)別

    這篇文章主要介紹了@Component和@Bean的區(qū)別,在這給大家簡單介紹下作用對象不同:@Component 注解作用于類,而 @Bean 注解作用于方法,具體實例代碼參考下本文
    2024-03-03
  • 啟動Springboot項目時找不到Mapper的問題及解決

    啟動Springboot項目時找不到Mapper的問題及解決

    這篇文章主要介紹了啟動Springboot項目時找不到Mapper的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • shiro整合springboot前后端分離

    shiro整合springboot前后端分離

    這篇文章主要介紹了shiro整合springboot前后端分離,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-12-12
  • java使用任務(wù)架構(gòu)執(zhí)行任務(wù)調(diào)度示例

    java使用任務(wù)架構(gòu)執(zhí)行任務(wù)調(diào)度示例

    在Java 5.0之前啟動一個任務(wù)是通過調(diào)用Thread類的start()方法來實現(xiàn)的,5.0里提供了一個新的任務(wù)執(zhí)行架構(gòu)使你可以輕松地調(diào)度和控制任務(wù)的執(zhí)行,并且可以建立一個類似數(shù)據(jù)庫連接池的線程池來執(zhí)行任務(wù),下面看一個示例
    2014-01-01

最新評論