Spring Cloud Stream整合RocketMQ的搭建方法
Spring Cloud Stream整合RocketMQ
這里書接上回,默認(rèn)你已經(jīng)搭建好了RocketMQ主從異步集群,前面文章已經(jīng)介紹過搭建方法。
1、Spring Cloud Stream介紹
Spring Cloud Stream是一個框架,用于構(gòu)建與共享消息系統(tǒng)連接的高度可擴(kuò)展的事件驅(qū)動微服務(wù)。
官網(wǎng):https://spring.io/projects/spring-cloud-stream

該框架提供了一個靈活的編程模型,該模型基于已經(jīng)建立和熟悉的Spring習(xí)慣用法和最佳實踐,包括對持久pub/sub語義、消費者組和有狀態(tài)分區(qū)的支持。

Spring Cloud Stream的核心構(gòu)建塊是:
Destination Binders:負(fù)責(zé)提供與外部消息傳遞系統(tǒng)集成的組件。Destination Bindings:外部消息系統(tǒng)和最終用戶提供的應(yīng)用程序代碼(生產(chǎn)者/消費者)之間的橋梁。Message:生產(chǎn)者和消費者用來與目標(biāo)綁定器(以及通過外部消息系統(tǒng)的其他應(yīng)用程序)進(jìn)行通信的規(guī)范數(shù)據(jù)結(jié)構(gòu)。
2、生產(chǎn)者
2.1 引入依賴
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.2.2.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>注意,RocketMQ官方維護(hù)的Spring-Cloud-Stream依賴中的rocketmq版本為4.4,需要排出后加入4.7.1的依。
2.2 編寫配置文件
spring:
application:
name: my-spring-cloud-rocketmq-producer
cloud:
stream:
bindings:
output:
destination: TopicTest
rocketmq:
binder:
name-server: 192.168.159.34:9876
server:
port: 80802.3 啟動類打上注解
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding(Source.class)
@SpringBootApplication
public class MySpringCloudRocketmqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MySpringCloudRocketmqProducerApplication.class, args);
}
}其中,@EnableBinding(Source.class)指向配置文件的output參數(shù)。
2.4 編寫生產(chǎn)者程序
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Component
public class MyProducer {
@Resource
private Source source;
public void sendMessage(String msg){
//封裝消息頭
Map<String,Object> headers=new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS,"TagA");
MessageHeaders messageHeaders=new MessageHeaders(headers);
//創(chuàng)建消息對象
Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
//發(fā)送消息
source.output().send(message);
}
}2.5 編寫單元測試發(fā)送消息
@SpringBootTest
class MySpringCloudRocketmqProducerApplicationTests {
@Autowired
private MyProducer producer;
@Test
void contextLoads() {
producer.sendMessage("hello,spring cloud stream message");
}
}3、消費者
3.1 引入依賴
與生產(chǎn)者相同。
3.2 編寫配置文件
spring:
application:
name: my-spring-cloud-rocketmq-consumer
cloud:
stream:
bindings:
# input消費者
input:
destination: TopicTest
group: spring-cloud-stream-consumer-group
# 配置RocketMQ
rocketmq:
binder:
name-server: 192.168.159.34:9876
server:
port: 80813.3 啟動類打上注解
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
@SpringBootApplication
public class MySpringCloudRocketmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MySpringCloudRocketmqConsumerApplication.class, args);
}
}其中@EnableBinding(Sink.class)指向配置文件的input參數(shù)。
3.4 編寫消費者程序
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
@StreamListener(Sink.INPUT)
public void processMessage(String message){
System.out.println("收到的消息:"+message);
}
}先啟動消費者,使用單元測試發(fā)送消息。

到此這篇關(guān)于Spring Cloud Stream整合RocketMQ的文章就介紹到這了,更多相關(guān)Spring Cloud Stream整合RocketMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringCloudStream原理和深入使用小結(jié)
- SpringCloud中的Stream服務(wù)間消息傳遞詳解
- 使用Spring?Cloud?Stream處理事件的示例詳解
- spring-cloud-stream的手動消息確認(rèn)問題
- SpringCloudStream中的消息分區(qū)數(shù)詳解
- 關(guān)于SpringCloudStream配置問題
- Spring Cloud Stream 高級特性使用詳解
- SpringCloud微服務(wù)開發(fā)基于RocketMQ實現(xiàn)分布式事務(wù)管理詳解
- SpringCloud+RocketMQ實現(xiàn)分布式事務(wù)的實踐
相關(guān)文章
spring boot項目application.properties文件存放及使用介紹
這篇文章主要介紹了spring boot項目application.properties文件存放及使用介紹,我們的application.properties文件中會有很多敏感信息,大家在使用過程中要多加小心2021-06-06
SpringBoot集成H2數(shù)據(jù)庫的實現(xiàn)示例
H2數(shù)據(jù)庫作為一個輕量級的內(nèi)存數(shù)據(jù)庫,非常適合開發(fā)階段作為嵌入式數(shù)據(jù)庫進(jìn)行單元測試和功能驗證,本文主要介紹了SpringBoot集成H2數(shù)據(jù)庫的實現(xiàn)示例,具有一定的參考的參考價值,感興趣的可以了解一下2024-07-07
Java基礎(chǔ)之詳解基本數(shù)據(jù)類型的使用
今天給大家?guī)淼氖顷P(guān)于Java基礎(chǔ)的相關(guān)知識,文章圍繞著基本數(shù)據(jù)類型的使用展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06
Dependency ‘XXX:‘ not found問題的三步解決
這篇文章主要介紹了Dependency ‘XXX:‘ not found問題的三步解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
springboot+redis過期事件監(jiān)聽實現(xiàn)過程解析
這篇文章主要介紹了springboot+redis過期事件監(jiān)聽實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-03-03

