欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java中RabbitMQ高級(jí)應(yīng)用

 更新時(shí)間:2022年05月03日 09:01:08   作者:beordie  
本文主要介紹了java中RabbitMQ高級(jí)應(yīng)用,中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

1、消息可靠性投遞

 在使用 RabbitMQ 的時(shí)候,生產(chǎn)者在進(jìn)行消息投遞的時(shí)候如果想知道消息是否成功的投遞到對(duì)應(yīng)的交換機(jī)和隊(duì)列中,有兩種方式可以用來(lái)控制消息投遞的可靠性模式 。

 由上圖的整個(gè)消息的投遞過(guò)程來(lái)看,生產(chǎn)者的消息進(jìn)入到中間件中會(huì)首先到達(dá)交換機(jī),然后再?gòu)慕粨Q機(jī)傳遞到隊(duì)列中去,也就是分為兩步走戰(zhàn)略。那么消息的丟失情況也就是會(huì)出現(xiàn)在這兩個(gè)階段中,RabbitMQ 貼心的為我們提供了針對(duì)于這兩個(gè)部分的可靠新傳遞模式:

  • confirm 模式。
  • return 模式。

 利用這兩個(gè)回調(diào)模式來(lái)確保消息的傳遞可靠。

 1.1、確認(rèn)模式

 消息從生產(chǎn)者到交換機(jī)之間傳遞會(huì)返回一個(gè) confirmCallback 的回調(diào)??梢灾苯釉?rabbitTemplate 實(shí)例中進(jìn)行確認(rèn)邏輯的設(shè)置。如果是使用 XML 配置的話需要在工廠配置開啟 publisher-confirms="true",YAML 的配置就直接 publisher-confirm-type: correlated,他默認(rèn)是 NONE ,需要手動(dòng)開啟。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println();
                if (!b) {
                    //	消息重發(fā)之類的處理
                    System.out.println(s);
                } else {
                    System.out.println("交換機(jī)成功接收消息");
                }
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

 上面的確認(rèn)是由一個(gè) confirm 的函數(shù)執(zhí)行的,里面攜帶了三個(gè)參數(shù),第一個(gè)是配置的相關(guān)信息,第二個(gè)表示交換機(jī)是否成功的接收到消息,第三個(gè)參數(shù)是指沒有成功接收消息的原因。

 1.2、退回模式

 從交換機(jī)到消息隊(duì)列投遞失敗會(huì)返回一個(gè) returnCallback 。在工廠配置中開啟回退模式 publisher-returns="true" ,設(shè)置交換機(jī)處理消息失敗的模式(默認(rèn) false 直接將消息進(jìn)行丟棄),添加退回處理的邏輯。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  重發(fā)邏輯處理
                System.out.println(message.getBody() + " 投遞消息隊(duì)列失敗");
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

returnedMessage 中攜帶五個(gè)參數(shù)、分別指的是消息對(duì)象、錯(cuò)誤碼、錯(cuò)誤信息、交換機(jī)、路由鍵。

 1.3、確認(rèn)機(jī)制

 在消費(fèi)者抓取消息隊(duì)列中的數(shù)據(jù)取消費(fèi)之后會(huì)有一個(gè)確認(rèn)機(jī)制進(jìn)行消息的確認(rèn),防止因?yàn)樽ト∠⒅蟮珱]有消費(fèi)成功而導(dǎo)致的消息丟失。有三種確認(rèn)方式:

  • 自動(dòng)確認(rèn)acknowledge="none"

  • 手動(dòng)確認(rèn)acknowledge="manual"

  • 根據(jù)異常情況確認(rèn)acknowledge="auto"

 其中自動(dòng)確認(rèn)是指一旦消息被消費(fèi)者抓取就自動(dòng)默認(rèn)成功,并將消息從消息隊(duì)列中進(jìn)行移除,如果這個(gè)時(shí)候消費(fèi)端消費(fèi)出現(xiàn)問(wèn)題,那么也會(huì)是默認(rèn)消息消費(fèi)成功,但是實(shí)際上是沒有消費(fèi)成功的,也就是當(dāng)前的消息丟失了。默認(rèn)的情況就是自動(dòng)確認(rèn)機(jī)制。

 如果設(shè)置手動(dòng)確認(rèn)的方式,就需要在正常消費(fèi)消息之后進(jìn)行回調(diào)確認(rèn) channel.basicAck(),手動(dòng)簽收。如果業(yè)務(wù)處理過(guò)程中發(fā)生了異常則調(diào)用 channel.basicNack() 重新發(fā)送消息。

 首先需要在隊(duì)列綁定時(shí)進(jìn)行確認(rèn)機(jī)制的配置,設(shè)置為手動(dòng)簽收。

<!-- 綁定隊(duì)列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>

 生產(chǎn)者一端不用更改,只需要改變消費(fèi)者的實(shí)現(xiàn)進(jìn)行消息自動(dòng)簽收就可以了,正常執(zhí)行業(yè)務(wù)則簽收消息,業(yè)務(wù)發(fā)生錯(cuò)誤則選擇消息拒簽,消息重發(fā)或者丟棄。

public class ConsumerAck implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //  消息唯一ID
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            String msg = new String(message.getBody(), "utf-8");
            channel.basicAck(tag, true);
            System.out.println("接收消息: " + msg);
        } catch (Exception e) {
            System.out.println("接收消息異常");
            channel.basicNack(tag, true, true);
            e.printStackTrace();
        }
    }
}

 里面涉及三個(gè)簡(jiǎn)單的簽收函數(shù),一是正確簽收的 basicAck ,二是單條拒簽的 basicReject ,三是批量拒簽的 basicNack 。

  • basicAck 第一個(gè)參數(shù)表示消息在通道中的唯一ID,只針對(duì)當(dāng)前的 Channel;第二個(gè)參數(shù)表示是否批量同意,如果是 false 的話只會(huì)同意簽收當(dāng)前ID的一條消息,將其從消息隊(duì)列中進(jìn)行刪除,而如果是 true 的話將會(huì)把此ID之前的消息一起給同意簽收了。
  • basicReject 第一個(gè)參數(shù)依舊表示消息的唯一ID,第二個(gè)參數(shù)表示是否重新回隊(duì)發(fā)送,false 表示直接丟棄該條消息或者有死信隊(duì)列可以接收, true 則表示重新回隊(duì)進(jìn)行消息發(fā)送,所有操作只針對(duì)當(dāng)前的消息。
  • basicNack 比第二個(gè)多了一個(gè)參數(shù),也就是處于中間位置的布爾值,表示是否批量進(jìn)行。

2、消費(fèi)端限流

 在用戶請(qǐng)求和DB服務(wù)處理之間增加消息中間件的隔離,使得突發(fā)流量全部讓消息隊(duì)列來(lái)抗,降低服務(wù)端被沖垮的可能性。讓所有的請(qǐng)求都往隊(duì)列中存,消費(fèi)端只需要?jiǎng)蛩俚娜〕鱿⑦M(jìn)行消費(fèi),這樣就能保證運(yùn)行效率,也不會(huì)因?yàn)楹笈_(tái)的阻塞而導(dǎo)致客戶端得不到正常的響應(yīng)(當(dāng)然指的是一些不需要同步回顯的任務(wù))。

 只需要在消費(fèi)者綁定消息隊(duì)列時(shí)指定取出消息的速率即可,需要使用手動(dòng)簽收的方式,每進(jìn)行一次的簽收才會(huì)從隊(duì)列中再取出下一條數(shù)據(jù)。

<!-- 綁定隊(duì)列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true"
                           acknowledge="manual" prefetch="1">
    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>

3、消息過(guò)期時(shí)間

 消息隊(duì)列提供了存儲(chǔ)在隊(duì)列中消息的過(guò)期時(shí)間,分為兩個(gè)方向的實(shí)現(xiàn),一個(gè)是針對(duì)于整個(gè)隊(duì)列中的所有消息,也就是隊(duì)列的過(guò)期時(shí)間,另一個(gè)是針對(duì)當(dāng)前消息的過(guò)期時(shí)間,也就是針對(duì)于單條消息單獨(dú)設(shè)置。

 隊(duì)列的過(guò)期時(shí)間設(shè)置很簡(jiǎn)單,只需要在創(chuàng)建隊(duì)列時(shí)進(jìn)行過(guò)期時(shí)間的指定即可,也可以通過(guò)控制臺(tái)直接創(chuàng)建指定過(guò)期時(shí)間。一旦隊(duì)列過(guò)期時(shí)間到了,隊(duì)列中還未被消費(fèi)的消息都將過(guò)期,進(jìn)行隊(duì)列的過(guò)期處理。

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
</rabbit:queue>

 單條消息的過(guò)期時(shí)間需要在發(fā)送的時(shí)候進(jìn)行單獨(dú)的指定,發(fā)送的時(shí)候指定配置的額外信息,配置的編寫由配置類完成。

 如果一條消息的過(guò)期時(shí)間到了,但是他此時(shí)處于隊(duì)列的中間,那么他將不會(huì)被處理,只有當(dāng)之后處理到時(shí)候才會(huì)進(jìn)行判斷是否過(guò)期。

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws
        AmqpException {
        //	設(shè)置 message 的過(guò)期時(shí)間
        message.getMessageProperties().setExpiration("5000");
        //	返回該消息
        return message;
    }
};
rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);

 如果說(shuō)同時(shí)設(shè)置了消息的過(guò)期時(shí)間和隊(duì)列的過(guò)期時(shí)間,那么最終的過(guò)期時(shí)間由最短的時(shí)間進(jìn)行決定,也就是說(shuō)如果當(dāng)前消息的過(guò)期時(shí)間沒到,但是整個(gè)隊(duì)列的過(guò)期時(shí)間到了,那么隊(duì)列中的所有消息也自然就過(guò)期了,執(zhí)行過(guò)期的處理策略。

4、死信隊(duì)列

 4.1、死信概念

死信隊(duì)列指的是死信交換機(jī),當(dāng)一條消息成為死信之后可以重新發(fā)送到另一個(gè)交換機(jī)進(jìn)行處理,而進(jìn)行處理的這個(gè)交換機(jī)就叫做死信交換機(jī)。

  • 消息成為死信消息有幾種情況

    隊(duì)列的消息長(zhǎng)度達(dá)到限制

    消費(fèi)者拒接消息的時(shí)候不把消息重新放入隊(duì)列中

    隊(duì)列存在消息過(guò)期設(shè)置,消息超時(shí)未被消費(fèi)

    消息存在過(guò)期時(shí)間,在投遞給消費(fèi)者時(shí)發(fā)現(xiàn)過(guò)期

 在創(chuàng)建隊(duì)列時(shí)可以在配置中指定相關(guān)的信息,例如死信交換機(jī)、隊(duì)列長(zhǎng)度等等,之后的一系列工作就不由程序員進(jìn)行操作了,MQ 會(huì)自己完成配置過(guò)的事件響應(yīng)。

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <!-- 死信交換機(jī) -->
        <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/>
        <!-- 路由 -->
        <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/>
        <!-- 隊(duì)列過(guò)期時(shí)間 -->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        <!-- 隊(duì)列長(zhǎng)度 -->
        <entry key="x-max-length" value-type="java.lang.Integer" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

 4.2、延遲隊(duì)列

 延遲隊(duì)列指的是消息在進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi),只有到達(dá)指定時(shí)間之后才會(huì)被消費(fèi),也就是需要有一個(gè)時(shí)間的判斷條件。

 消息隊(duì)列實(shí)際上是沒有提供對(duì)延遲隊(duì)列的實(shí)現(xiàn)的,但是可以通過(guò) TTL + 死信隊(duì)列 的方式完成,設(shè)置一個(gè)隊(duì)列,不被任何的消費(fèi)者所消費(fèi),所有的消息進(jìn)入都會(huì)被保存在里面,設(shè)置隊(duì)列的過(guò)期時(shí)間,一旦隊(duì)列過(guò)期將所有的消息過(guò)渡到綁定的死信隊(duì)列中。

 再由具體的消費(fèi)者來(lái)消費(fèi)死信隊(duì)列中的消息,這樣就實(shí)現(xiàn)了延遲隊(duì)列的功能。

 例如實(shí)現(xiàn)一個(gè)下單超時(shí)支付取消訂單的功能:

到此這篇關(guān)于java中RabbitMQ高級(jí)應(yīng)用的文章就介紹到這了,更多相關(guān)java RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • spring使用RedisTemplate的操作類訪問(wèn)Redis

    spring使用RedisTemplate的操作類訪問(wèn)Redis

    本篇文章主要介紹了spring使用RedisTemplate的操作類訪問(wèn)Redis,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-05-05
  • Java?Rabbitmq中四種集群架構(gòu)的區(qū)別詳解

    Java?Rabbitmq中四種集群架構(gòu)的區(qū)別詳解

    這篇文章主要為大家詳細(xì)介紹了Java?Rabbitmq中四種集群架構(gòu)的區(qū)別,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-02-02
  • java 如何將多種字符串格式 解析為Date格式

    java 如何將多種字符串格式 解析為Date格式

    這篇文章主要介紹了java 如何將多種字符串格式 解析為Date格式的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • java遍歷http請(qǐng)求request的所有參數(shù)實(shí)現(xiàn)方法

    java遍歷http請(qǐng)求request的所有參數(shù)實(shí)現(xiàn)方法

    下面小編就為大家?guī)?lái)一篇java遍歷http請(qǐng)求request的所有參數(shù)實(shí)現(xiàn)方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-09-09
  • java TreeUtil菜單遞歸工具類

    java TreeUtil菜單遞歸工具類

    這篇文章主要為大家詳細(xì)介紹了java TreeUtil菜單遞歸工具類,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-08-08
  • Java中的System.arraycopy()淺復(fù)制方法詳解

    Java中的System.arraycopy()淺復(fù)制方法詳解

    這篇文章主要介紹了Java中的System.arraycopy()淺復(fù)制方法詳解,Java數(shù)組的復(fù)制操作可以分為深度復(fù)制和淺度復(fù)制,簡(jiǎn)單來(lái)說(shuō)深度復(fù)制,可以將對(duì)象的值和對(duì)象的內(nèi)容復(fù)制;淺復(fù)制是指對(duì)對(duì)象引用的復(fù)制,需要的朋友可以參考下
    2023-11-11
  • Java注解中@Component和@Bean的區(qū)別

    Java注解中@Component和@Bean的區(qū)別

    這篇文章主要介紹了@Component和@Bean的區(qū)別,在這給大家簡(jiǎn)單介紹下作用對(duì)象不同:@Component 注解作用于類,而 @Bean 注解作用于方法,具體實(shí)例代碼參考下本文
    2024-03-03
  • 啟動(dòng)Springboot項(xiàng)目時(shí)找不到Mapper的問(wèn)題及解決

    啟動(dòng)Springboot項(xiàng)目時(shí)找不到Mapper的問(wèn)題及解決

    這篇文章主要介紹了啟動(dòng)Springboot項(xiàng)目時(shí)找不到Mapper的問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • shiro整合springboot前后端分離

    shiro整合springboot前后端分離

    這篇文章主要介紹了shiro整合springboot前后端分離,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-12-12
  • java使用任務(wù)架構(gòu)執(zhí)行任務(wù)調(diào)度示例

    java使用任務(wù)架構(gòu)執(zhí)行任務(wù)調(diào)度示例

    在Java 5.0之前啟動(dòng)一個(gè)任務(wù)是通過(guò)調(diào)用Thread類的start()方法來(lái)實(shí)現(xiàn)的,5.0里提供了一個(gè)新的任務(wù)執(zhí)行架構(gòu)使你可以輕松地調(diào)度和控制任務(wù)的執(zhí)行,并且可以建立一個(gè)類似數(shù)據(jù)庫(kù)連接池的線程池來(lái)執(zhí)行任務(wù),下面看一個(gè)示例
    2014-01-01

最新評(píng)論