SpringBoot集成RocketMQ的使用示例
一、RocketMQ基本概念
消息模型(Message Model)
RocketMQ主要由Producer、Broker、Consumer三部分組成,其中Producer負責(zé)生產(chǎn)消息,Consumer負責(zé)消費消息,Broker負責(zé)存儲消息。Broker在實際部署過程中對應(yīng)一臺服務(wù)器,每個Broker可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的Broker。MessageQueue用于存儲消息的物理地址,每個Topic中的消息地址存儲于多個MessageQueue中。ConsumerGroup由多個Consumer實例構(gòu)成。
1、在springBoot項目中添加Maven依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
2、添加配置:
application.yml 文件中添加如下配置:
rocketmq:
name-server: 192.168.152.165:9876
producer:
group: my-groupSpringBoot 集成 RocketMQ代碼:
生產(chǎn)者: 消息發(fā)送的三種方式
package com.rocketmq.springbootrocketmq;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.TimeUnit;
@RunWith(SpringRunner.class)
@SpringBootTest
public class T {
@Autowired
private RocketMQTemplate rocketMQTemplate;
//同步消息
@Test
public void testRocketMQ() {
Message msg = MessageBuilder.withPayload("boot發(fā)送同步消息").build();
rocketMQTemplate.send("helloTopicBoot", msg);
System.out.println("success send");
}
//異步消息
@Test
public void sendASYCMsg() throws InterruptedException {
Message message = MessageBuilder.withPayload("boot發(fā)送異步消息").build();
rocketMQTemplate.asyncSend("helloTopicBoot", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("發(fā)送狀態(tài):"+sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息發(fā)送失敗");
}
});
TimeUnit.SECONDS.sleep(5);
}
//一次性消息
@Test
public void sendOneWayRocketMQ() {
Message msg = MessageBuilder.withPayload("boot發(fā)送一次性消息").build();
rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
}
}
消費者:
package com.example.springbooTRocketMQConsumer.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
public class HelloTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
消息消費的兩種模式
集群模式:默認模式
廣播模式:
消費者:messageModel = MessageModel.BROADCASTING
package com.example.springbooTRocketMQConsumer.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING)
public class HelloTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
順序消息
生產(chǎn)者:
//順序消息
@Test
public void sendOrderlyMsg(){
//設(shè)置隊列選擇器
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
String orderIdStr = (String) o;
long orderId = Long.parseLong(orderIdStr);
int index = (int)orderId % list.size();
return list.get(index);
}
});
List<OrderStep> orderSteps = OrderUtil.buildOrders();
for (OrderStep orderStep : orderSteps) {
Message msg = MessageBuilder.withPayload(orderStep.toString()).build();
rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(orderStep.getOrderId()));
}
}消費者:
package com.example.springbooTRocketMQConsumer.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
@Component
@RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY)
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("當(dāng)前線程:" + Thread.currentThread() + "隊列ID"+messageExt.getQueueId() + ",消息內(nèi)容:" + new String(messageExt.getBody(),Charset.defaultCharset()));
}
}
延遲消息
生產(chǎn)者:
//延遲消息
@Test
public void sendDelayRocketMQ() {
Message msg = MessageBuilder.withPayload("boot發(fā)送延時消息,發(fā)送時間:"+new Date()).build();
rocketMQTemplate.syncSend("helloTopicBoot", msg,3000,3);
}
消費者:
package com.example.springbooTRocketMQConsumer.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.Date;
@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
public class DelayTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
消息Tag條件過濾
生成者
//Tag消息
@Test
public void sendTagFilterRocketMQ() {
Message msg1 = MessageBuilder.withPayload("消息A").build();
rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1);
Message msg2 = MessageBuilder.withPayload("消息B").build();
rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2);
Message msg3 = MessageBuilder.withPayload("消息C").build();
rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3);
}消費者:
package com.example.springbooTRocketMQConsumer.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.Date;
@Component
@RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC")
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
SQL92消息過濾
生產(chǎn)者:
//SQL92消息
@Test
public void sendSQL92FilterRocketMQ() {
Message msg1 = MessageBuilder.withPayload("小紅,年齡22,體重45").setHeader("age","22").setHeader("weight",45).build();
rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1);
Message msg2 = MessageBuilder.withPayload("小明,年齡25,體重60").setHeader("age","25").setHeader("weight",60).build();
rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2);
Message msg3 = MessageBuilder.withPayload("小藍,年齡40,體重70").setHeader("age","40").setHeader("weight",70).build();
rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3);
}
消費者:
package com.example.springbooTRocketMQConsumer.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.Date;
@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age > 23 and weight > 60")
public class SQL92FilterTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
}
}到此這篇關(guān)于SpringBoot集成RocketMQ的使用示例的文章就介紹到這了,更多相關(guān)SpringBoot集成RocketMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 淺談Springboot整合RocketMQ使用心得
- springBoot整合RocketMQ及坑的示例代碼
- SpringBoot整合RocketMQ實現(xiàn)消息發(fā)送和接收的詳細步驟
- Springboot RocketMq實現(xiàn)過程詳解
- 解決SpringBoot整合RocketMQ遇到的坑
- SpringBoot整合RocketMQ實現(xiàn)發(fā)送同步消息
- Springboot詳解RocketMQ實現(xiàn)消息發(fā)送與接收流程
- SpringBoot項目嵌入RocketMQ的實現(xiàn)示例
- SpringBoot+RocketMQ實現(xiàn)延遲消息的示例代碼
相關(guān)文章
通過代理類實現(xiàn)java連接數(shù)據(jù)庫(使用dao層操作數(shù)據(jù))實例分享
java通過代理類實現(xiàn)數(shù)據(jù)庫DAO操作代碼分享,大家參考使用吧2013-12-12
java實現(xiàn)PPT轉(zhuǎn)PDF出現(xiàn)中文亂碼問題的解決方法
這篇文章主要為大家詳細介紹了java實現(xiàn)PPT轉(zhuǎn)PDF出現(xiàn)中文亂碼問題的解決方法,進行了詳細的問題分析,需要的朋友可以參考下2015-11-11
Java中出現(xiàn)java.lang.IllegalStateException異常錯誤的解決
這篇文章主要介紹了Java中出現(xiàn)java.lang.IllegalStateException異常錯誤的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01
Java報錯net.dean.jraw.http.NetworkException異常的原因及解決方法
在開發(fā)涉及網(wǎng)絡(luò)通信的Java應(yīng)用程序時,我們經(jīng)常需要處理各種網(wǎng)絡(luò)異常,net.dean.jraw.http.NetworkException是在使用jRAW庫時可能遇到的一個異常,本文將詳細探討NetworkException的成因,并提供多種解決方案,需要的朋友可以參考下2024-12-12

