欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ實現消費端限流的步驟

 更新時間:2024年03月08日 11:36:26   作者:思靜語  
消費者端限流的主要目的是控制消費者每次從 RabbitMQ 中獲取的消息數量,從而實現消息處理的流量控制,這篇文章主要介紹了RabbitMQ如何實現消費端限流,需要的朋友可以參考下

概述

在 RabbitMQ 中,可以通過消費者端限流(Consumer Prefetch)來控制消費端處理消息的速度,以避免消費端處理能力不足或處理過慢而導致消息堆積。消費者端限流的主要目的是控制消費者每次從 RabbitMQ 中獲取的消息數量,從而實現消息處理的流量控制。
RabbitMQ 提供了一種 QOS(服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息還未被消費確認,則不進行新消息的消費。
RabbitMQ 為我們提供了三種機制
● 對內存和磁盤使用量設置閾值
● 于credit flow 的流控機制
● QoS保證機制
channel.basicQos()
channel.basicQos(int prefetchSize,int prefetchCount,boolean global)
一定要注意的是,如果做限流,那么no_ask是要設置為false,也就是手工簽收而不是自動簽收的情況下才可以做限流。
參數:
prefetchSize:消息的大小
prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多于N個消息,即一旦有N個消息還沒有ack,則該consumer將block掉,直到有消息ack
global:是否將上面設置應用于channel,簡單點說,就是上面限制是channel級別的還是consumer級別
注意,prefetchSize和golobal參數還沒有實現。
Channel的詳細介紹:
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
Connection是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關部分邏輯。
ConnectionFactory如名稱,是客戶端與broker的tcp連接工廠,負責根據uri創(chuàng)建Connection。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業(yè)務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發(fā)布消息等。如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創(chuàng)建單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。
注意,rabbitmq提供了服務質量保障功能,即在非自動確認消息的前提下,如果一定數目的消息未被確認,不進行消費新的消息。也就是說,我們要使用非自動ack

@Configuration
public class DirectRabbitConfig {
    public static final String DEAD_LETTER_EXCHANGE = "dead.latter.exchange";
    public static final String DEAD_LETTER_QUEUE = "dead.latter.queue";
    public static final String DEAD_LETTER_KEY = "dead.latter.key";
    //隊列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        //實例化隊列時各個參數的含義如下:
        //name:隊列名稱
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創(chuàng)建的連接使用,而且當連接關閉后隊列即被刪除。此參考優(yōu)先級高于durable
        // autoDelete:默認是false,是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //一般設置一下隊列的持久化就好,其余兩個就是默認false(消息和交換機也可以持久化,但是消息持久化的前提是需要和隊列,交換機持久化一起使用)
        //為業(yè)務隊列綁定一個死信交換機,當業(yè)務隊列里的消息過期了就被轉發(fā)到死信交換機,再由死信交換機發(fā)給死信隊列處理
//        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    這里聲明當前隊列綁定的死信交換機
//        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  這里聲明當前隊列的死信路由key
//        args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);
        //5000毫秒
//        args.put("x-message-ttl", 5000);
//        return new Queue("TestDirectQueue",false,false,false,args);
        return new Queue("TestDirectQueue",false,false,false);
    }
    //Direct交換機 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange",true,false);
    }
    //綁定  將隊列和交換機綁定, 并設置用于匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
    /**
     * ========================死信隊列==================================
     */
    // 聲明死信Exchange
    @Bean
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 聲明死信隊列A
    @Bean
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE);
    }
    @Bean
    public Binding deadLetterBindingA(){
        return BindingBuilder.bind(deadLetterQueueA()).to(deadLetterExchange()).with(DEAD_LETTER_KEY);
    }
}
@Component
@RabbitListener(queues = "TestDirectQueue")//監(jiān)聽的隊列名稱 TestDirectQueue
public class DirectReceiver {
     Integer index = 0;
    @RabbitHandler
    public void process(Channel channel,String msg) throws Exception {
        ++index;
        channel.basicQos(0, 1, false);
        //設置非自動ack
        DefaultConsumer consumer = new DefaultConsumer(channel);
        channel.basicConsume("TestDirectQueue", false,consumer);
        //假設業(yè)務處理需要3秒,那么當消費者接受到消息的時候,只處理一條,且要處理3秒,那么在服務器堆積的多條信息就不會瘋狂涌入
        Thread.sleep(3000);
        System.out.println(" DirectReceiver消費者收到消息  : " + msg + ",第" +index+ "條"+"======"+ new Date().toString());
    }
}

RabbitMQ 中實現消費端限流的步驟

1.設置消費者端的預取值(Prefetch Count):
在創(chuàng)建消費者時,可以通過設置 basicQos(prefetchCount) 方法來指定消費者端的預取值,即每次從 RabbitMQ 中預取的消息數量。

2.確保消費者端開啟手動應答模式:
在設置預取值之前,確保消費者端已經開啟了手動應答模式(manual ack mode),這樣消費者可以自主控制何時應答消息。

3.消費者端處理消息時進行手動應答:
當消費者端接收到消息后,在處理完消息之后,需要顯式地發(fā)送應答(ack)給 RabbitMQ,表示該消息已經被消費。這樣,消費者才能繼續(xù)接收下一批消息。
下面是一個簡單的 Java 示例代碼,演示了如何在 RabbitMQ 中實現消費端限流:

import com.rabbitmq.client.*;
public class Consumer {
    private final static String QUEUE_NAME = "queue_name";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        int prefetchCount = 5; // 設置預取值為 5
        channel.basicQos(prefetchCount);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received: " + message);
            // 模擬消息處理
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手動應答消息
        };
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

在上述代碼中,我們首先創(chuàng)建了連接和信道,并聲明了一個名為 “queue_name” 的隊列。然后,通過 channel.basicQos(prefetchCount) 方法設置了消費者端的預取值為 5。接著,我們定義了一個 DeliverCallback 回調函數,在其中處理消息并手動應答。最后,通過 channel.basicConsume() 方法啟動消費者端。
通過設置預取值和手動應答,消費者端可以控制自身處理消息的速度,有效地實現消費端的限流。希望這個示例能幫助您理解如何在 RabbitMQ 中實現消費端限流!

到此這篇關于RabbitMQ如何實現消費端限流的文章就介紹到這了,更多相關RabbitMQ消費端限流內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • java中流的使用

    java中流的使用

    本文主要介紹了java中流的使用以及分類。具有一定的參考價值,下面跟著小編一起來看下吧
    2017-01-01
  • Java?中?hashCode()?與?equals()?的關系(面試)

    Java?中?hashCode()?與?equals()?的關系(面試)

    這篇文章主要介紹了Java中hashCode()與equals()的關系,ava中hashCode()和equals()的關系是面試中的??键c,文章對hashCode與equals的關系做出詳解,需要的小伙伴可以參考一下
    2022-09-09
  • springboot使用RedisRepository操作數據的實現

    springboot使用RedisRepository操作數據的實現

    本文主要介紹了springboot使用RedisRepository操作數據的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-05-05
  • 使用@CachePut?更新數據庫和更新緩存

    使用@CachePut?更新數據庫和更新緩存

    這篇文章主要介紹了使用@CachePut?更新數據庫和更新緩存方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Intellij?IDEA如何修改配置文件位置

    Intellij?IDEA如何修改配置文件位置

    這篇文章主要介紹了Intellij?IDEA--修改配置文件位置,文章末尾給大家介紹了Intellij?IDEA--宏的用法記錄操作過程,對此文感興趣的朋友跟隨小編一起看看吧
    2022-08-08
  • 深入了解MyBatis參數

    深入了解MyBatis參數

    今天小編就為大家分享一篇關于深入了解MyBatis參數,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-12-12
  • Java基于redis實現分布式鎖

    Java基于redis實現分布式鎖

    系統(tǒng)的不斷擴大,分布式鎖是最基本的保障。今天我們介紹通過redis實現分布式鎖,感興趣的朋友可以參考下
    2021-05-05
  • SpringBoot整合sharding-jdbc?實現分庫分表操作的示例代碼

    SpringBoot整合sharding-jdbc?實現分庫分表操作的示例代碼

    在Spring?Boot中使用ShardingSphere的Sharding-JDBC來實現數據庫的分庫分表是一個常見的需求,下面就拉具體介紹一下實現步驟,需要的朋友們下面隨著小編來一起學習學習吧
    2025-04-04
  • 實現quartz定時器及quartz定時器原理介紹

    實現quartz定時器及quartz定時器原理介紹

    Quartz是一個大名鼎鼎的Java版開源定時調度器,功能強悍,使用方便,下面我們看看如何使用它
    2013-12-12
  • JpaRepository如何實現增刪改查并進行單元測試

    JpaRepository如何實現增刪改查并進行單元測試

    這篇文章主要介紹了JpaRepository如何實現增刪改查并進行單元測試,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11

最新評論