Springboot?整合?RocketMQ?收發(fā)消息的配置過程
Springboot 整合 RocketMQ 收發(fā)消息
創(chuàng)建springboot項目
pom.xml添加rocketmq-spring-boot-starter
依賴。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
yml 配置
application.yml
rocketmq: name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生產(chǎn)者組組名
rocketmq: producer: group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生產(chǎn)者組組名
rocketmq: producer: group: producer-demo2
測試
demo 1
- 發(fā)送普通消息
- 發(fā)送 Spring 的通用 Message 對象
- 發(fā)送異步消息
- 發(fā)送順序消息
生產(chǎn)者
package cn.tedu.demo2.m1; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class Producer { @Autowired private RocketMQTemplate t ; public void send(){ //發(fā)送同步消息 t.convertAndSend("Topic1:TagA", "Hello world! "); //發(fā)送spring的Message Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build(); t.send("Topic1:TagA",message); //發(fā)送異步消息 t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("發(fā)送成功"); } @Override public void onException(Throwable throwable) { System.out.println("發(fā)送失敗"); } }); //發(fā)送順序消息 t.syncSendOrderly("Topic1", "98456237,創(chuàng)建", "98456237"); t.syncSendOrderly("Topic1", "98456237,支付", "98456237"); t.syncSendOrderly("Topic1", "98456237,完成", "98456237"); } }
消費者
package cn.tedu.demo2.m1; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("收到"+s); } }
主類
package cn.tedu.demo2.m1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } }
測試類
需要放在 test 文件夾
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @SpringBootTest @ActiveProfiles("demo1") public class Test1 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }
demo 2
發(fā)送事務(wù)消息
生產(chǎn)者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class Producer { @Autowired private RocketMQTemplate t; public void send(){ Message<String> message = MessageBuilder.withPayload("Hello world").build(); //一旦發(fā)送消息,則執(zhí)行監(jiān)聽器 t.sendMessageInTransaction("Topic2",message,null); } @RocketMQTransactionListener class Lis implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("執(zhí)行本地事務(wù)"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println("執(zhí)行事務(wù)回查"); return RocketMQLocalTransactionState.COMMIT; } } }
消費者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("收到"+s); } }
主類
package cn.tedu.demo2.m2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } }
測試類
package cn.tedu.demo2.m2; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @SpringBootTest @ActiveProfiles("demo2") public class Test2 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); //為了能夠收到消費者消費的數(shù)據(jù),這里通過休眠模擬等待時間 try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } } }
到此這篇關(guān)于Springboot 整合 RocketMQ 收發(fā)消息的文章就介紹到這了,更多相關(guān)Springboot 整合 RocketMQ 收發(fā)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解使用Mybatis-plus + velocity模板生成自定義的代碼
這篇文章主要介紹了詳解使用Mybatis-plus + velocity模板生成自定義的代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03解決IDEA 2020.1版本 maven Test命令出現(xiàn)導(dǎo)包錯誤的問題
這篇文章主要介紹了IDEA 2020.1版本 maven Test命令出現(xiàn)導(dǎo)包錯誤的問題及解決方法,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08java向mysql插入數(shù)據(jù)亂碼問題的解決方法
這篇文章主要為大家詳細(xì)介紹了java向mysql插入數(shù)據(jù)亂碼問題的解決方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09Spring boot調(diào)用Oracle存儲過程的兩種方式及完整代碼
這篇文章主要給大家介紹了關(guān)于Spring boot調(diào)用Oracle存儲過程的兩種方式及完整代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08