最新SpringCloud?Stream消息驅動講解
SpringCloud Stream消息驅動
1、SpringCloud Stream概述
官方地址:https://spring.io/projects/spring-cloud-stream#overview
中文指導手冊地址:https://m.wang1314.com/doc/webapp/topic/20971999.html
SpringCloud Stream 是一個構建消息驅動微服務的框架
應用程序通過 outputs 或 inputs 來與 SpringCloud Stream 中的 binder 對象交互
SpringCloud Stream 中的 binder 對象負責與消息中間件交互
通過 SpringCloud Stream 連接消息中間件,以實現(xiàn)消息事件驅動
什么是SpringCloudStream官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。
應用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream中binder對象交互。通過我們配置binding(綁定) ,而 Spring Cloud Stream 的 binder對象負責與消息中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅動的方式。
通過使用Spring Integration來連接消息代理中間件以實現(xiàn)消息事件驅動。Spring Cloud Stream 為一些供應商的消息中間件產(chǎn)品提供了個性化的自動化配置實現(xiàn),引用了發(fā)布-訂閱、消費組、分區(qū)的三個核心概念。
目前僅支持RabbitMQ、Kafka。
1.1、設計思想
1、標注的MQ流程
生產(chǎn)者/消費者之間靠消息媒介傳遞信息內(nèi)容【massage】
消息必須走特定的通道【消息通道MessageChannel】
消息通道里的消息如何被消費呢,誰負責收發(fā)處理
消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器所訂閱
2、Cloud Stream的作用
比方說我們用到了RabbitMQ和Kafka,由于這兩個消息中間件的架構上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū)。
這些中間件的差異性導致我們實際項目開發(fā)給我們造成了一定的困擾,我們?nèi)绻昧藘蓚€消息隊列的其中一種,后面的業(yè)務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統(tǒng)耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式。
3、什么是Binder
在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,由于各消息中間件構建的初衷不同,它們的實現(xiàn)細節(jié)上會有較大的差異性通過定義綁定器作為中間層,完美地實現(xiàn)了應用程序與消息中間件細節(jié)之間的隔離。
通過向應用程序暴露統(tǒng)一的Channel通道,使得應用程序不需要再考慮各種不同的消息中間件實現(xiàn)。
Binder可以生成Binding,Binding用來綁定消息容器的生產(chǎn)者和消費者,它有兩種類型,INPUT和OUTPUT,INPUT對應于消費者,OUTPUT對應于生產(chǎn)者。 4、Stream中的消息通信方式遵循了發(fā)布-訂閱模式
使用Topic主題進行廣播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic
1.2、標準的流程套路
1、Binder:很方便的連接中間件,屏蔽不同的差異
2、Channel
通道,是隊列Queue的一種抽象,在消息通訊系統(tǒng)中就是實現(xiàn)存儲和轉發(fā)的媒介,通過Channel對隊列進行配置
3、Source和Sink
簡單的可理解為參照對象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。
1.3、編碼API和常用注解
組成和注解 | 描述 |
---|---|
Middleware | 中間件,目前只支持RabbitM和Kafka |
Binder | Binder是應用與消息中間的封裝,目前實現(xiàn)了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態(tài)的改變消息類型(對應Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現(xiàn) |
@Input | 注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序 |
@Output | 注解標識輸出通道,發(fā)布的消息將通過通道離開應用程序 |
@StreamListener | 監(jiān)聽隊列,用戶消費者的隊列的消息接收 |
@EnableBinding | 指通道channel和exchange綁定在一起 |
2、消息驅動之生產(chǎn)者(output)
2.1、新建模塊cloud-stream-rabbitmq-provider8801
2.2、引入pom.xml配置文件
如果是需要Stream整合的就將依賴改為
spring-cloud-starter-stream-kafka
<dependencies> <!--stream整合rabbit依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.3、YAML配置文件
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用于于binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環(huán)境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 output: # 這個名字是一個通道的名稱,消息生產(chǎn)者 destination: studyExchange # 表示要使用的Exchange名稱定義【自定義】 content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置【上面的配置】 eureka: client: # 客戶端進行Eureka注冊的配置 service-url: defaultZone: http://localhost:7001/eureka
2.4、生產(chǎn)者啟動類
package com.zcl.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * 描述:消息生產(chǎn)者啟動類 * * @author zhong * @date 2022-09-22 12:19 */ @SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
2.5、業(yè)務實現(xiàn)
2.5.1、服務接口實現(xiàn)類
自己創(chuàng)建一個實現(xiàn)的接口以及里面的方法
注意:在這個服務實現(xiàn)類里面不是使用
@Service
注解了,因為不是web應用,而是Stream消息驅動,是與中間件進行打交道的不是與數(shù)據(jù)庫
package com.zcl.springcloud.service.Impl; import com.zcl.springcloud.service.IMessageProvider; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import javax.annotation.Resource; import java.util.UUID; /** * 描述:發(fā)送接口實現(xiàn)類 * 必須使用@EnableBinding(Source.class)注解開啟消息推送管道 * * @author zhong * @date 2022-09-22 12:24 */ @Slf4j @EnableBinding(Source.class) public class IMessageProviderImpl implements IMessageProvider { /** * 消息發(fā)送管道 */ @Resource private MessageChannel output; /** * 發(fā)送消息 * @return */ @Override public String send() { // 定義消息 String serial = UUID.randomUUID().toString(); // 構建并發(fā)送消息 this.output.send(MessageBuilder.withPayload(serial).build()); log.info("-------------- " + serial + " ----------------"); return serial; } }
2.5.2、控制器實現(xiàn)
package com.zcl.springcloud.controller; import com.zcl.springcloud.service.IMessageProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 描述:消息發(fā)送控制器 * * @author zhong * @date 2022-09-22 12:37 */ @RestController public class SendMessageController { /** * 注入消息發(fā)送管道接口 */ @Resource private IMessageProvider messageProvider; /** * 每調(diào)用一次接口發(fā)送一次消息 * @return */ @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); } }
2.6、啟動測試
- 啟動7001Eureka訪問中心
- 啟動8801消息發(fā)送者,啟動成功以及觀察RabbitMQ的管理界面
3.訪問接口發(fā)送消息,查看MQ的管理頁面波峰情況
3、消息驅動之消費者(input)
同樣的參考如下流程圖
3.1、新建cloud-stream-rabbitmq-consumer8802模塊
3.2、引入pom.xml依賴
與8801一樣
3.3、添加YAML配置文件
配置文件與消息生產(chǎn)的區(qū)別在于:
output: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用于于binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環(huán)境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置 eureka: client: # 客戶端進行Eureka注冊的配置 service-url: defaultZone: http://localhost:7001/eureka
3.4、添加啟動類StreamMQMain8802
與消息生產(chǎn)者一樣
3.5、業(yè)務實現(xiàn)
必須要有
@Component
注解注入到Spring容器中
package com.zcl.springcloud.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** * 描述:消息消費者控制器 * * @author zhong * @date 2022-09-22 13:18 */ @Slf4j @Component @EnableBinding(Sink.class) public class ReceiveMessageListener { /** * 注入消費者的端口號 */ @Value("${server.port}") private String port; /** * 監(jiān)聽消息 * @param message * @return */ @StreamListener(Sink.INPUT) public void input(Message<String> message){ log.info("消費者1號接收到的消息 ----- " + message.getPayload() + " -----,port: " + port); } }
3.6、啟動項目測試
- 啟動7001
- 啟動8801,消息發(fā)送者
- 啟動8802,消息消費者
- 8801發(fā)送消息,8802消費消息,并查看具體的MQ波峰圖
控制器輸出
4、分組消費與持久化
4.1、完整參考cloud-stream-rabbitmq-consumer8802,創(chuàng)建8803項目
除了啟動的端口號不一樣之外其他的配置都一樣
4.2、啟動項目發(fā)現(xiàn)問題
- 啟動7001(Eureka服務中心)
- 啟動8801(生產(chǎn))、8802(消費)、8803(消費)
- 測試發(fā)送消失是否兩個消費者都可以接收到
4.2.1、重復消費
目前是8802/8803同時都收到了,存在重復消費問題
解決方案:分組和持久化屬性group
常見案例
比如在如下場景中,訂單系統(tǒng)我們做集群部署,都會從RabbitMQ中獲取訂單信息,那如果一個訂單同時被兩個服務獲取到,那么就會造成數(shù)據(jù)錯誤,我們得避免這種情況。這時我們就可以使用Stream中的消息分組來解決
注意在Stream中處于同一個group中的多個消費者是競爭關系,就能夠保證消息只會被其中一個應用消費一次。
不同組是可以全面消費的(重復消費),同一組內(nèi)會發(fā)生競爭關系,只有其中一個可以消費。
4.2.2、分組
自定義配置分組,自定義分為同一個組,解決重復消費問題
配置文件分組
分別給8801、8802進行分組【orderA】
重啟項目查看MQ管理
orderB是歷史記錄,上面的配置以及都分為了
ordeerA
組,進入orderA組可以查看實際的消費者數(shù)量同一組內(nèi)會發(fā)生競爭關系,只有其中一個可以消費,啟動項目測試是否為真
4.2.3、持久化
通過上述,解決了重復消費問題,再看看持久化
停止8802/8803并去除掉8802的分組group: atguiguA,8803保留
8801先發(fā)送7條消息到rabbitmq
3.先啟動8802,無分組屬性配置,后臺沒有打出來消息
8802因為取消了
groupA
的分組所以獲取不到持久化的數(shù)據(jù)(如果重啟mq也會消失)
4.再啟動8803,有分組屬性配置,后臺打出來了MQ上的消息
8803保存
groupA
的分組所以在啟動的時候就會將持久化的數(shù)據(jù)消費
到此這篇關于SpringCloud Stream消息驅動的文章就介紹到這了,更多相關SpringCloud Stream消息驅動內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
@Autowired注解注入的xxxMapper報錯問題及解決
這篇文章主要介紹了@Autowired注解注入的xxxMapper報錯問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請求的操作
這篇文章主要介紹了解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請求的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12使用SpringBoot開發(fā)Restful服務實現(xiàn)增刪改查功能
Spring Boot是由Pivotal團隊提供的全新框架,其設計目的是用來簡化新Spring應用的初始搭建以及開發(fā)過程。這篇文章主要介紹了基于SpringBoot開發(fā)一個Restful服務,實現(xiàn)增刪改查功能,需要的朋友可以參考下2018-01-01