欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解決rocketmq-spring-boot-starter導(dǎo)致的多消費者實例重復(fù)消費問題

 更新時間:2024年06月18日 08:42:08   作者:lipfff  
這篇文章主要介紹了解決rocketmq-spring-boot-starter導(dǎo)致的多消費者實例重復(fù)消費問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

問題描述

在使用rocketMQ集群模式消費的時候(通過springboot整合的依賴操作)

發(fā)現(xiàn)在一臺服務(wù)器啟動多個消費者實例后,消息進(jìn)行了重復(fù)消費,但是在不同的服務(wù)器部署消費者實例后,消息能正常消費。

背景知識

rocketMQ的消費模式有兩種:

  • 1.負(fù)載均衡模式 消費者采用負(fù)載均衡方式消費消息,多個消費者(服務(wù)啟動多個,本地多個springboot加載類啟動)共同消費隊列消息,每個消費者處理的消息不同
  • 2.廣播模式 消費者采用廣播的方式消費消息,每個消費者消費的消息都是相同的

本地代碼

依賴版本

    <properties>
        <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
    </properties>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq-spring-boot-starter-version}</version>
        </dependency>

消費者:

package cn.lpf.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @auth lipf
 * @date 2021/1/16 16:27
 */
@Slf4j
@Component
//,messageModel = MessageModel.CLUSTERING
//,consumeMode = ConsumeMode.ORDERLY
@RocketMQMessageListener(consumerGroup =  "huanlv-group", topic = "miniapp_service_topic")
public class HotelServiceConsumer implements RocketMQListener<String> {

    private AtomicInteger count=new AtomicInteger(0);
    @Override
    public void onMessage(String message) {
        log.info("HotelServiceConsumer-->Receive message:{}",message);
        long start = System.currentTimeMillis();
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        log.info("HotelServiceConsumer-->handler message:{} ms,{} s",(end-start),(end-start)/1000);
    }
}

消息積壓并重復(fù)消息

觀察console客戶端,如下:

原因分析

通過使用rocket客戶端消費是沒問題的:

/**
 * 消息的接受者
 */
public class BaseConsumer {

    public static void main(String[] args) throws Exception {
        //1.創(chuàng)建消費者Consumer,制定消費者組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huanlv-group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("172.16.100.11:9876");
        //3.訂閱主題Topic和Tag
        consumer.subscribe("miniapp_service_topic", "*");

        //設(shè)定消費模式:負(fù)載均衡|廣播模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
//        consumer.setMessageModel(MessageModel.BROADCASTING);

        //4.設(shè)置回調(diào)函數(shù),處理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息內(nèi)容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.啟動消費者consumer
        consumer.start();
    }
}

所以這個時候懷疑是springboot整合rockemq的版本有問題,把版本有2.0.3換成了2.2.0問題解決掉,客戶端注入ClientId規(guī)則發(fā)現(xiàn)變化,消息正常消費:

<properties>
        <rocketmq-spring-boot-starter-version>2.2.0</rocketmq-spring-boot-starter-version>
         </properties>

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • SpringBoot前后端分離跨域問題:狀態(tài)碼403拒絕訪問解決辦法

    SpringBoot前后端分離跨域問題:狀態(tài)碼403拒絕訪問解決辦法

    這篇文章主要給大家介紹了關(guān)于SpringBoot前后端分離跨域問題:狀態(tài)碼403拒絕訪問的解決辦法,403是被服務(wù)器拒絕了,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-01-01
  • IDEA2020.1個性化設(shè)置的實現(xiàn)

    IDEA2020.1個性化設(shè)置的實現(xiàn)

    這篇文章主要介紹了IDEA2020.1個性化設(shè)置的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • Java遞歸簡單實現(xiàn)n的階乘

    Java遞歸簡單實現(xiàn)n的階乘

    這篇文章主要介紹了Java遞歸簡單實現(xiàn)n的階乘,遞歸(recursion)就是子程序(或函數(shù))直接調(diào)用自己或通過一系列調(diào)用語句間接調(diào)用自己,是一種描述問題和解決問題的基本方法,下面我們舉一個小小的例子詳情了解一下,需要的朋友可以參考下
    2021-12-12
  • Java編程實現(xiàn)服務(wù)器端支持?jǐn)帱c續(xù)傳的方法(可支持快車、迅雷)

    Java編程實現(xiàn)服務(wù)器端支持?jǐn)帱c續(xù)傳的方法(可支持快車、迅雷)

    這篇文章主要介紹了Java編程實現(xiàn)服務(wù)器端支持?jǐn)帱c續(xù)傳的方法,涉及Java文件傳輸?shù)南嚓P(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-11-11
  • 動態(tài)代理模擬實現(xiàn)aop的示例

    動態(tài)代理模擬實現(xiàn)aop的示例

    下面小編就為大家?guī)硪黄獎討B(tài)代理模擬實現(xiàn)aop的示例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,希望對大家有所幫助
    2017-11-11
  • SpringBoot使用@PostConstruct注解導(dǎo)入配置方式

    SpringBoot使用@PostConstruct注解導(dǎo)入配置方式

    這篇文章主要介紹了SpringBoot使用@PostConstruct注解導(dǎo)入配置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • java通過方向鍵控制小球移動的小游戲

    java通過方向鍵控制小球移動的小游戲

    這篇文章主要為大家詳細(xì)介紹了java通過方向鍵控制小球移動的小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-10-10
  • java中使用Filter控制用戶登錄權(quán)限具體實例

    java中使用Filter控制用戶登錄權(quán)限具體實例

    java中使用Filter控制用戶登錄權(quán)限具體實例,需要的朋友可以參考一下
    2013-06-06
  • SpringBoot創(chuàng)建并簡單使用的實現(xiàn)

    SpringBoot創(chuàng)建并簡單使用的實現(xiàn)

    這篇文章主要介紹了SpringBoot創(chuàng)建并簡單使用的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • Spring JPA自定義查詢結(jié)果的接收方式

    Spring JPA自定義查詢結(jié)果的接收方式

    這篇文章主要介紹了Spring JPA自定義查詢結(jié)果的接收方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01

最新評論