解決rocketmq-spring-boot-starter導(dǎo)致的多消費者實例重復(fù)消費問題
問題描述
在使用rocketMQ集群模式消費的時候(通過springboot整合的依賴操作)
發(fā)現(xiàn)在一臺服務(wù)器啟動多個消費者實例后,消息進行了重復(fù)消費,但是在不同的服務(wù)器部署消費者實例后,消息能正常消費。
背景知識
rocketMQ的消費模式有兩種:
- 1.負載均衡模式 消費者采用負載均衡方式消費消息,多個消費者(服務(wù)啟動多個,本地多個springboot加載類啟動)共同消費隊列消息,每個消費者處理的消息不同
- 2.廣播模式 消費者采用廣播的方式消費消息,每個消費者消費的消息都是相同的
本地代碼
依賴版本
<properties> <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version> </properties>
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-spring-boot-starter-version}</version> </dependency>
消費者:
package cn.lpf.mq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.concurrent.atomic.AtomicInteger; /** * @auth lipf * @date 2021/1/16 16:27 */ @Slf4j @Component //,messageModel = MessageModel.CLUSTERING //,consumeMode = ConsumeMode.ORDERLY @RocketMQMessageListener(consumerGroup = "huanlv-group", topic = "miniapp_service_topic") public class HotelServiceConsumer implements RocketMQListener<String> { private AtomicInteger count=new AtomicInteger(0); @Override public void onMessage(String message) { log.info("HotelServiceConsumer-->Receive message:{}",message); long start = System.currentTimeMillis(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); log.info("HotelServiceConsumer-->handler message:{} ms,{} s",(end-start),(end-start)/1000); } }
消息積壓并重復(fù)消息
觀察console客戶端,如下:
原因分析
通過使用rocket客戶端消費是沒問題的:
/** * 消息的接受者 */ public class BaseConsumer { public static void main(String[] args) throws Exception { //1.創(chuàng)建消費者Consumer,制定消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huanlv-group"); //2.指定Nameserver地址 consumer.setNamesrvAddr("172.16.100.11:9876"); //3.訂閱主題Topic和Tag consumer.subscribe("miniapp_service_topic", "*"); //設(shè)定消費模式:負載均衡|廣播模式 consumer.setMessageModel(MessageModel.CLUSTERING); // consumer.setMessageModel(MessageModel.BROADCASTING); //4.設(shè)置回調(diào)函數(shù),處理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息內(nèi)容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.啟動消費者consumer consumer.start(); } }
所以這個時候懷疑是springboot整合rockemq的版本有問題,把版本有2.0.3換成了2.2.0問題解決掉,客戶端注入ClientId規(guī)則發(fā)現(xiàn)變化,消息正常消費:
<properties> <rocketmq-spring-boot-starter-version>2.2.0</rocketmq-spring-boot-starter-version> </properties>
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot前后端分離跨域問題:狀態(tài)碼403拒絕訪問解決辦法
這篇文章主要給大家介紹了關(guān)于SpringBoot前后端分離跨域問題:狀態(tài)碼403拒絕訪問的解決辦法,403是被服務(wù)器拒絕了,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-01-01Java編程實現(xiàn)服務(wù)器端支持斷點續(xù)傳的方法(可支持快車、迅雷)
這篇文章主要介紹了Java編程實現(xiàn)服務(wù)器端支持斷點續(xù)傳的方法,涉及Java文件傳輸?shù)南嚓P(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-11-11SpringBoot使用@PostConstruct注解導(dǎo)入配置方式
這篇文章主要介紹了SpringBoot使用@PostConstruct注解導(dǎo)入配置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11java中使用Filter控制用戶登錄權(quán)限具體實例
java中使用Filter控制用戶登錄權(quán)限具體實例,需要的朋友可以參考一下2013-06-06SpringBoot創(chuàng)建并簡單使用的實現(xiàn)
這篇文章主要介紹了SpringBoot創(chuàng)建并簡單使用的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10