SpringBoot集成RocketMQ的使用示例
一、RocketMQ基本概念
消息模型(Message Model)
RocketMQ主要由Producer、Broker、Consumer三部分組成,其中Producer負責生產(chǎn)消息,Consumer負責消費消息,Broker負責存儲消息。Broker在實際部署過程中對應一臺服務器,每個Broker可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的Broker。MessageQueue用于存儲消息的物理地址,每個Topic中的消息地址存儲于多個MessageQueue中。ConsumerGroup由多個Consumer實例構成。
1、在springBoot項目中添加Maven依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
2、添加配置:
application.yml 文件中添加如下配置:
rocketmq: name-server: 192.168.152.165:9876 producer: group: my-group
SpringBoot 集成 RocketMQ代碼:
生產(chǎn)者: 消息發(fā)送的三種方式
package com.rocketmq.springbootrocketmq; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.TimeUnit; @RunWith(SpringRunner.class) @SpringBootTest public class T { @Autowired private RocketMQTemplate rocketMQTemplate; //同步消息 @Test public void testRocketMQ() { Message msg = MessageBuilder.withPayload("boot發(fā)送同步消息").build(); rocketMQTemplate.send("helloTopicBoot", msg); System.out.println("success send"); } //異步消息 @Test public void sendASYCMsg() throws InterruptedException { Message message = MessageBuilder.withPayload("boot發(fā)送異步消息").build(); rocketMQTemplate.asyncSend("helloTopicBoot", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("發(fā)送狀態(tài):"+sendResult.getSendStatus()); } @Override public void onException(Throwable throwable) { System.out.println("消息發(fā)送失敗"); } }); TimeUnit.SECONDS.sleep(5); } //一次性消息 @Test public void sendOneWayRocketMQ() { Message msg = MessageBuilder.withPayload("boot發(fā)送一次性消息").build(); rocketMQTemplate.sendOneWay("helloTopicBoot", msg); } }
消費者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; @Component @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot") public class HelloTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset())); } }
消息消費的兩種模式
集群模式:默認模式
廣播模式:
消費者:messageModel = MessageModel.BROADCASTING
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; @Component @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING) public class HelloTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset())); } }
順序消息
生產(chǎn)者:
//順序消息 @Test public void sendOrderlyMsg(){ //設置隊列選擇器 rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) { String orderIdStr = (String) o; long orderId = Long.parseLong(orderIdStr); int index = (int)orderId % list.size(); return list.get(index); } }); List<OrderStep> orderSteps = OrderUtil.buildOrders(); for (OrderStep orderStep : orderSteps) { Message msg = MessageBuilder.withPayload(orderStep.toString()).build(); rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(orderStep.getOrderId())); } }
消費者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; @Component @RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY) public class OrderlyTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("當前線程:" + Thread.currentThread() + "隊列ID"+messageExt.getQueueId() + ",消息內容:" + new String(messageExt.getBody(),Charset.defaultCharset())); } }
延遲消息
生產(chǎn)者:
//延遲消息 @Test public void sendDelayRocketMQ() { Message msg = MessageBuilder.withPayload("boot發(fā)送延時消息,發(fā)送時間:"+new Date()).build(); rocketMQTemplate.syncSend("helloTopicBoot", msg,3000,3); }
消費者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; import java.util.Date; @Component @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot") public class DelayTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset())); } }
消息Tag條件過濾
生成者
//Tag消息 @Test public void sendTagFilterRocketMQ() { Message msg1 = MessageBuilder.withPayload("消息A").build(); rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1); Message msg2 = MessageBuilder.withPayload("消息B").build(); rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2); Message msg3 = MessageBuilder.withPayload("消息C").build(); rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3); }
消費者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; import java.util.Date; @Component @RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC") public class TagFilterTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset())); } }
SQL92消息過濾
生產(chǎn)者:
//SQL92消息 @Test public void sendSQL92FilterRocketMQ() { Message msg1 = MessageBuilder.withPayload("小紅,年齡22,體重45").setHeader("age","22").setHeader("weight",45).build(); rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1); Message msg2 = MessageBuilder.withPayload("小明,年齡25,體重60").setHeader("age","25").setHeader("weight",60).build(); rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2); Message msg3 = MessageBuilder.withPayload("小藍,年齡40,體重70").setHeader("age","40").setHeader("weight",70).build(); rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3); }
消費者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; import java.util.Date; @Component @RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age > 23 and weight > 60") public class SQL92FilterTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset())); } }
到此這篇關于SpringBoot集成RocketMQ的使用示例的文章就介紹到這了,更多相關SpringBoot集成RocketMQ內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot 接收List 入?yún)⒌膸追N方法
本文主要介紹了springboot 接收List 入?yún)⒌膸追N方法,本文主要介紹了7種方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-03-03SpringBoot?對接飛書多維表格事件回調監(jiān)聽流程分析
本文介紹了如何通過飛書事件訂閱機制和SpringBoot項目集成,對多維表數(shù)據(jù)的記錄變更進行對接的詳細流程,包括如何創(chuàng)建應用、配置參數(shù)、編寫訂閱代碼、訂閱文檔事件以及在SpringBoot工程中集成的步驟,感興趣的朋友跟隨小編一起看看吧2024-12-12gradle使用maven-publish發(fā)布jar包上傳到私有maven配置
這篇文章主要介紹了gradle使用maven-publish發(fā)布jar包上傳到私有maven的配置示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03使用SpringBoot+OkHttp+fastjson實現(xiàn)Github的OAuth第三方登錄
這篇文章主要介紹了使用SpringBoot+OkHttp+fastjson實現(xiàn)Github的OAuth第三方登錄,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-02-02RabbitMQ?延遲隊列實現(xiàn)訂單支付結果異步階梯性通知(實例代碼)
這篇文章主要介紹了RabbitMQ?延遲隊列實現(xiàn)訂單支付結果異步階梯性通知,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-02-02Java中BigDecimal類與int、Integer使用總結
這篇文章主要給大家介紹了關于Java中BigDecimal類與int、Integer使用的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Java具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧2019-07-07spring boot(三)之Spring Boot中Redis的使用
這篇文章主要介紹了spring boot(三)之Spring Boot中Redis的使用,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2017-05-05