Spring Cloud Stream簡單用法
Spring Cloud Stream對Spring Cloud體系中的Mq進⾏了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節(jié)差異,就像Hibernate屏蔽掉了具體數(shù)據(jù)庫(Mysql/Oracle⼀樣)。如此⼀來,我們學習、開發(fā)、維護MQ都會變得輕松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在這個基礎上提供了RocketMQ的支持
簡單使用Spring Cloud Stream 構建基于RocketMQ的生產者和消費者
生產者
pom文件中加入依賴
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.1.0.RELEASE</version> </dependency> </dependencies>
配置文件中增加關于Spring Cloud Stream binder和bindings的配置
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON
其中destination代表生產的數(shù)據(jù)發(fā)送到的topic 然后定義一個channel用于數(shù)據(jù)發(fā)送
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface TestChannel { @Output("output") MessageChannel output(); }
最后構造數(shù)據(jù)發(fā)送的接口
@Controller public class SendMessageController { @Resource private TestChannel testChannel; @ResponseBody @RequestMapping(value = "send", method = RequestMethod.GET) public String sendMessage() { String messageId = UUID.randomUUID().toString(); Message<String> message = MessageBuilder .withPayload("this is a test:" + messageId) .setHeader(MessageConst.PROPERTY_TAGS, "test") .build(); try { testChannel.output().send(message); return messageId + "發(fā)送成功"; } catch (Exception e) { return messageId + "發(fā)送失敗,原因:" + e.getMessage(); } } }
消費者
消費者的pom引入與生產者相同,在此不再贅述,配置時需要將stream的output修改為input并修改對應屬性
spring: application: name: zhao-cloud-stream-consumer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input: consumer: tags: test bindings: input: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON group: test
另外關于channel的構造也要做同樣的修改
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface TestChannel { @Input("input") SubscribableChannel input(); }
最后我在啟動類中對收到的消息進行了監(jiān)聽
@StreamListener("input") public void receiveInput(@Payload Message message) throws ValidationException { System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); }
測試結果
Stream其他特性
消息發(fā)送失敗的處理
消息發(fā)送失敗后悔發(fā)送到默認的一個“topic.errors"的channel中(topic是配置的destination)。要配置消息發(fā)送失敗的處理,需要將錯誤消息的channel打開 消費者配置如下
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON producer: errorChannelEnabled: true
在啟動類中配置錯誤消息的Channel信息
@Bean("stream-test-topic.errors") MessageChannel testoutPutErrorChannel(){ return new PublishSubscribeChannel(); }
新建異常處理service
import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; @Service public class ErrorProducerService { @ServiceActivator(inputChannel = "stream-test-topic.errors") public void receiveProducerError(Message message){ System.out.println("receive error msg :"+message); } }
當發(fā)生異常時,由于測試類中已經將異常捕獲,處理發(fā)送異常主要是在這里進行。模擬,應用與rocketMq斷開的場景。可見
消費者錯誤處理
首先增加配置為
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON producer: errorChannelEnabled: true
增加相應的模擬異常的操作
@StreamListener("input") public void receiveInput(@Payload Message message) throws ValidationException { //System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); throw new RuntimeException("oops"); } @ServiceActivator(inputChannel = "stream-test-topic.test.errors") public void receiveConsumeError(Message message){ System.out.println("receive error msg"+message.getPayload()); }
代碼地址https://github.com/zhendiao/deme-code/tree/main/zp
到此這篇關于Spring Cloud Stream簡單用法的文章就介紹到這了,更多相關Spring Cloud Stream使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java使用DateUtils對日期進行數(shù)學運算經典應用示例【附DateUtils相關包文件下載】
這篇文章主要介紹了Java使用DateUtils對日期進行數(shù)學運算的方法,可實現(xiàn)針對日期時間的各種常見運算功能,并附帶DateUtils的相關包文件供讀者下載使用,需要的朋友可以參考下2017-11-11將springboot項目生成可依賴的jar并引入到項目中的方法
SpringBoot項目默認打包的是可運行jar包,也可以打包成不可運行的jar包,本文給大家介紹將springboot項目生成可依賴的jar并引入到項目中的方法,感興趣的朋友一起看看吧2023-11-11java?-jar命令及SpringBoot通過java?-jav啟動項目的過程
本篇文章將為大家講述關于 SpringBoot 項目工程完成后,是如何通過 java-jar 命令來啟動的,以及介紹 java-jar 命令的詳細內容,對SpringBoot java?-jav啟動過程感興趣的朋友跟隨小編一起看看吧2023-05-05springBoot server.port=-1的含義說明
這篇文章主要介紹了springBoot server.port=-1的含義說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08