Spring Cloud Stream簡(jiǎn)單用法
Spring Cloud Stream對(duì)Spring Cloud體系中的Mq進(jìn)⾏了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細(xì)節(jié)差異,就像Hibernate屏蔽掉了具體數(shù)據(jù)庫(kù)(Mysql/Oracle⼀樣)。如此⼀來(lái),我們學(xué)習(xí)、開(kāi)發(fā)、維護(hù)MQ都會(huì)變得輕松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在這個(gè)基礎(chǔ)上提供了RocketMQ的支持
簡(jiǎn)單使用Spring Cloud Stream 構(gòu)建基于RocketMQ的生產(chǎn)者和消費(fèi)者
生產(chǎn)者
pom文件中加入依賴
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
</dependencies>
配置文件中增加關(guān)于Spring Cloud Stream binder和bindings的配置
spring:
application:
name: zhao-cloud-stream-producer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
group: test
sync: true
bindings:
output:
destination: stream-test-topic
content-type: text/plain # 內(nèi)容格式。這里使用 JSON
其中destination代表生產(chǎn)的數(shù)據(jù)發(fā)送到的topic 然后定義一個(gè)channel用于數(shù)據(jù)發(fā)送
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface TestChannel {
@Output("output")
MessageChannel output();
}
最后構(gòu)造數(shù)據(jù)發(fā)送的接口
@Controller
public class SendMessageController {
@Resource
private TestChannel testChannel;
@ResponseBody
@RequestMapping(value = "send", method = RequestMethod.GET)
public String sendMessage() {
String messageId = UUID.randomUUID().toString();
Message<String> message = MessageBuilder
.withPayload("this is a test:" + messageId)
.setHeader(MessageConst.PROPERTY_TAGS, "test")
.build();
try {
testChannel.output().send(message);
return messageId + "發(fā)送成功";
} catch (Exception e) {
return messageId + "發(fā)送失敗,原因:" + e.getMessage();
}
}
}
消費(fèi)者
消費(fèi)者的pom引入與生產(chǎn)者相同,在此不再贅述,配置時(shí)需要將stream的output修改為input并修改對(duì)應(yīng)屬性
spring:
application:
name: zhao-cloud-stream-consumer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
consumer:
tags: test
bindings:
input:
destination: stream-test-topic
content-type: text/plain # 內(nèi)容格式。這里使用 JSON
group: test
另外關(guān)于channel的構(gòu)造也要做同樣的修改
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface TestChannel {
@Input("input")
SubscribableChannel input();
}
最后我在啟動(dòng)類中對(duì)收到的消息進(jìn)行了監(jiān)聽(tīng)
@StreamListener("input")
public void receiveInput(@Payload Message message) throws ValidationException {
System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
}
測(cè)試結(jié)果


Stream其他特性
消息發(fā)送失敗的處理
消息發(fā)送失敗后悔發(fā)送到默認(rèn)的一個(gè)“topic.errors"的channel中(topic是配置的destination)。要配置消息發(fā)送失敗的處理,需要將錯(cuò)誤消息的channel打開(kāi) 消費(fèi)者配置如下
spring:
application:
name: zhao-cloud-stream-producer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
group: test
sync: true
bindings:
output:
destination: stream-test-topic
content-type: text/plain # 內(nèi)容格式。這里使用 JSON
producer:
errorChannelEnabled: true
在啟動(dòng)類中配置錯(cuò)誤消息的Channel信息
@Bean("stream-test-topic.errors")
MessageChannel testoutPutErrorChannel(){
return new PublishSubscribeChannel();
}
新建異常處理service
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
public class ErrorProducerService {
@ServiceActivator(inputChannel = "stream-test-topic.errors")
public void receiveProducerError(Message message){
System.out.println("receive error msg :"+message);
}
}
當(dāng)發(fā)生異常時(shí),由于測(cè)試類中已經(jīng)將異常捕獲,處理發(fā)送異常主要是在這里進(jìn)行。模擬,應(yīng)用與rocketMq斷開(kāi)的場(chǎng)景??梢?jiàn)

消費(fèi)者錯(cuò)誤處理
首先增加配置為
spring:
application:
name: zhao-cloud-stream-producer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
group: test
sync: true
bindings:
output:
destination: stream-test-topic
content-type: text/plain # 內(nèi)容格式。這里使用 JSON
producer:
errorChannelEnabled: true
增加相應(yīng)的模擬異常的操作
@StreamListener("input")
public void receiveInput(@Payload Message message) throws ValidationException {
//System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
throw new RuntimeException("oops");
}
@ServiceActivator(inputChannel = "stream-test-topic.test.errors")
public void receiveConsumeError(Message message){
System.out.println("receive error msg"+message.getPayload());
}

代碼地址https://github.com/zhendiao/deme-code/tree/main/zp
到此這篇關(guān)于Spring Cloud Stream簡(jiǎn)單用法的文章就介紹到這了,更多相關(guān)Spring Cloud Stream使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
鄰接表無(wú)向圖的Java語(yǔ)言實(shí)現(xiàn)完整源碼
這篇文章主要介紹了鄰接表無(wú)向圖的Java語(yǔ)言實(shí)現(xiàn)完整源碼,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-12-12
SpringBoot3解決跨域請(qǐng)求的方案小結(jié)
解決跨域請(qǐng)求,主要有JSONP,iframe,window.name,CORS等方式,其中CORS方式是最常用的跨域?qū)崿F(xiàn)方式,而且是對(duì)各種請(qǐng)求方法、各種數(shù)據(jù)請(qǐng)求類型都是完美支持的,本文介紹了SpringBoot3解決跨域請(qǐng)求的方案小結(jié),需要的朋友可以參考下2024-07-07
Java swing 圖像處理多種效果實(shí)現(xiàn)教程
這篇文章主要介紹了Java swing 圖像處理多種效果實(shí)現(xiàn)教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-09-09
Java使用DateUtils對(duì)日期進(jìn)行數(shù)學(xué)運(yùn)算經(jīng)典應(yīng)用示例【附DateUtils相關(guān)包文件下載】
這篇文章主要介紹了Java使用DateUtils對(duì)日期進(jìn)行數(shù)學(xué)運(yùn)算的方法,可實(shí)現(xiàn)針對(duì)日期時(shí)間的各種常見(jiàn)運(yùn)算功能,并附帶DateUtils的相關(guān)包文件供讀者下載使用,需要的朋友可以參考下2017-11-11
教你利用SpringBoot寫(xiě)一個(gè)屬于自己的Starter
如果我們將可獨(dú)立于業(yè)務(wù)代碼之外的功配置模塊封裝成一個(gè)個(gè)starter,復(fù)用的時(shí)候只需要將其在pom中引用依賴即可,SpringBoot為我們完成自動(dòng)裝配,簡(jiǎn)直不要太爽,這篇文章主要給大家介紹了關(guān)于如何利用SpringBoot寫(xiě)一個(gè)屬于自己的Starter,需要的朋友可以參考下2022-03-03
將springboot項(xiàng)目生成可依賴的jar并引入到項(xiàng)目中的方法
SpringBoot項(xiàng)目默認(rèn)打包的是可運(yùn)行jar包,也可以打包成不可運(yùn)行的jar包,本文給大家介紹將springboot項(xiàng)目生成可依賴的jar并引入到項(xiàng)目中的方法,感興趣的朋友一起看看吧2023-11-11
java?-jar命令及SpringBoot通過(guò)java?-jav啟動(dòng)項(xiàng)目的過(guò)程
本篇文章將為大家講述關(guān)于 SpringBoot 項(xiàng)目工程完成后,是如何通過(guò) java-jar 命令來(lái)啟動(dòng)的,以及介紹 java-jar 命令的詳細(xì)內(nèi)容,對(duì)SpringBoot java?-jav啟動(dòng)過(guò)程感興趣的朋友跟隨小編一起看看吧2023-05-05
springBoot server.port=-1的含義說(shuō)明
這篇文章主要介紹了springBoot server.port=-1的含義說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08

