一文快速掌握Spring?Cloud?Stream
本篇文章所涉及到的demo練習(xí) 使用的cloud 2021.0.3+ springboot2.6.8
一、概述簡介
官網(wǎng):https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
1.1. cloud Stream是什么
官方定義:Spring Cloud Stream是一個(gè)用于構(gòu)建 與 共享消息系統(tǒng)
連接的高度可擴(kuò)展的事件驅(qū)動微服務(wù)
。
目前主流的消息框架有:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
假設(shè)公司業(yè)務(wù)項(xiàng)目用了RabbitMQ,而大數(shù)據(jù)項(xiàng)目用了Kafka。這時(shí)候就會出現(xiàn)有兩個(gè)消息框架,相對于程序員來說其實(shí)并不友好,還得兩個(gè)都掌握,正常對于一個(gè)程序員來說熟練一個(gè)消息框架都不錯(cuò)了,何況還搞了兩個(gè),并且兩個(gè)維護(hù)起來也不好維護(hù)。
RabbitMQ和Kafka是兩個(gè)不同的框架,兩個(gè)消息模型上也存在著差異,并且代碼上用法也不一樣。Spring Cloud Stream就是不再關(guān)注具體MQ的細(xì)節(jié),可以在不改代碼的基礎(chǔ)上,來完成Rabbit和Kafka兩個(gè)不同的消息中間件的切換(這里的切換指的是原本用的RabbitMQ,但是用著用著發(fā)現(xiàn)kafka比較符合,所以想要換框架)。
總結(jié)成一句話:屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型
注意:遺憾的是目前僅支持RabbitMQ、Kafka
。
1.2. 設(shè)計(jì)思想
常規(guī)的MQ設(shè)計(jì)如下:
- Message:生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容
- MessageChannel:消息必須走特定的通道
- 隊(duì)列:假如發(fā)消息會先發(fā)到消息隊(duì)列當(dāng)中
- 消息隊(duì)列的消息如何被消費(fèi)呢:訂閱的人可以進(jìn)行消費(fèi)
cloud Stream設(shè)計(jì)如下:
通過定義綁定器Binder作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。
在沒有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會有較大的差異性,通過定義綁定器作為中間層,完美地實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。Stream對消息中間件的進(jìn)一步封裝,可以做到代碼層面對中間件的無感知,甚至于動態(tài)的切換中間件(rabbitmq切換為kafka),使得微服務(wù)開發(fā)的高度解耦,服務(wù)可以關(guān)注更多自己的業(yè)務(wù)流程
注意:左圖是官網(wǎng)的架構(gòu)圖
Binder可以生成Binding,Binding用來綁定消息容器的生產(chǎn)者和消費(fèi)者,它有兩種類型,INPUT和OUTPUT,INPUT對應(yīng)于消費(fèi)者,OUTPUT對應(yīng)于生產(chǎn)者。
stream為了屏蔽差異,抽象出來了一個(gè)Binder層,而目前為止,只提供了兩個(gè)框架的實(shí)現(xiàn),通過具體的實(shí)現(xiàn)來連接消息中間件。
假如想要通過stream連接RabbitMQ就使用:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
假如想要通過stream連接Kafka就使用:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
Stream中的消息通信方式遵循了發(fā)布-訂閱模式
,Topic主題進(jìn)行廣播,在RabbitMQ就是Exchange
,在Kakfa中就是Topic
。
1.3. 標(biāo)準(zhǔn)流程
- Binder: 很方便的連接中間件,屏蔽差異
- Channel: 通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲和轉(zhuǎn)發(fā)的媒介,通過Channe對隊(duì)列進(jìn)行配置
- Source(源:發(fā)送者)和Sink(水槽:接受者): 簡單的可理解為參照對象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。
1.4. 注解
注解完全是基于官方給的模型而定的!通過stream使用消息中間件也是非常簡單的,直接使用以下注解就可以使用。
注意:注解依然是能用的,但是官方明確表示注解已經(jīng)被棄用,棄用并不是不能用,而是用了會畫橫杠不建議用。但是功能是沒有問題的,低版本的cloud是沒有被棄用的。針對于注解和函數(shù)式編程兩種我都會進(jìn)行使用。
題外話:學(xué)技術(shù)永遠(yuǎn)是這樣,技術(shù)一直在不斷的更新迭代,真正學(xué)習(xí)一個(gè)技術(shù)并不是要掌握編碼使用,而是要掌握他到底是什么,能干什么,要去深入理解他,對于編碼,我認(rèn)為其實(shí)不是很重要。就算你今天掌握了官方最新用法,回頭人家又改寫法了。
二、基于注解代碼練習(xí)
生產(chǎn)者就是消息發(fā)送者,消費(fèi)者就是消息接受者。這里我就不用kafka了,我直接用的是RabbitMQ。
windows下安裝RabbitMQ:https://blog.csdn.net/weixin_43888891/article/details/126514021
2.1. 消息驅(qū)動之生產(chǎn)者
1.創(chuàng)建項(xiàng)目(可以是聚合可以是普通springboot項(xiàng)目)
2.添加pom
因?yàn)槭呛蚏abbitMQ整合,所以就是引入的spring-cloud-starter-stream-rabbit啟動器
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <springboot.version>2.6.8</springboot.version> <springcloud.version>2021.0.3</springcloud.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${springboot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${springcloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>
3.添加application配置
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息; defaultRabbit: # 表示定義的名稱,用于于binding整合 type: rabbit # 消息組件類型 environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務(wù)的整合處理 output: # 這個(gè)名字是一個(gè)通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain” binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
4.添加接口
public interface IMessageProvider { public String send(); }
5.添加實(shí)現(xiàn)類
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.UUID; // 可以理解為是一個(gè)消息的發(fā)送管道的定義 @EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { // 消息的發(fā)送管道 @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); // 創(chuàng)建并發(fā)送消息 this.output.send(MessageBuilder.withPayload(serial).build()); System.out.println("***serial: " + serial); return serial; } }
6.添加controller控制器
@RestController public class SendMessageController { @Autowired private IMessageProvider iMessageProvider; @GetMapping("send") public String send() { return iMessageProvider.send(); } }
7.測試
(1)首先要保證RabbitMQ是可以訪問的:http://localhost:15672
(2)啟動項(xiàng)目訪問:http://localhost:8801/send
下圖波峰代表發(fā)送消息成功
啟動后會創(chuàng)建交換機(jī),名稱就是application.yml當(dāng)中的destination屬性設(shè)置的
注意:停止服務(wù)后并沒有刪除交換機(jī)?。?!
2.2. 消息驅(qū)動之消費(fèi)者
1.創(chuàng)建項(xiàng)目
2.添加pom(pom和發(fā)送者依賴一模一樣)
3.添加application配置
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息; defaultRabbit: # 表示定義的名稱,用于于binding整合 type: rabbit # 消息組件類型 environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務(wù)的整合處理 input: # 這個(gè)名字是一個(gè)通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設(shè)置消息類型,本次為對象json,如果是文本則設(shè)置“text/plain” binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
4.添加監(jiān)聽(消費(fèi)者只負(fù)責(zé)接受消息)
import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消費(fèi)者1號,------->接收到的消息:" + message.getPayload() + "\t port: " + serverPort); } }
5.測試
(1)啟動RabbitMQ
(2)啟動發(fā)送消息端服務(wù)
(3)啟動消費(fèi)者服務(wù),啟動后會發(fā)現(xiàn),他自動會向這個(gè)交換機(jī)當(dāng)中添加一個(gè)隊(duì)列。
發(fā)送消息:http://localhost:8801/send
接受消息:
注意:當(dāng)停止服務(wù)后消息隊(duì)列會被自動刪除?。。?/strong>
2.3. 目前存在的問題
1.依照8802, clone出來一份運(yùn)行8803,主要用來演示多個(gè)消費(fèi)者的場景
2.啟動8801生產(chǎn)者
3.啟動8802消費(fèi)者
4.啟動8803消費(fèi)者
當(dāng)三個(gè)服務(wù)都啟動后通過RabbitMQ界面會發(fā)現(xiàn),一個(gè)交換機(jī)綁定了兩個(gè)隊(duì)列
運(yùn)行后會發(fā)現(xiàn)存在兩個(gè)問題:
有重復(fù)消費(fèi)問題消息持久化問題
(1)重復(fù)消費(fèi)問題:
發(fā)送消息后兩個(gè)消費(fèi)者都收到了消息:http://localhost:8801/send
比如在如下場景中,訂單系統(tǒng)我們做集群部署,都會從RabbitMQ中獲取訂單信息,那如果一個(gè)訂單同時(shí)被兩個(gè)服務(wù)獲取到,那么就會造成數(shù)據(jù)錯(cuò)誤,我們得避免這種情況。這時(shí)我們就可以使用Stream中的消息分組來解決
注意在Stream中處于同一個(gè)group中的多個(gè)消費(fèi)者是競爭關(guān)系,就能夠保證消息只會被其中一個(gè)應(yīng)用消費(fèi)一次。不同組是可以全面消費(fèi)的(重復(fù)消費(fèi)),同一組內(nèi)會發(fā)生競爭關(guān)系,只有其中一個(gè)可以消費(fèi)。
(2)消息持久化問題:
當(dāng)生產(chǎn)者發(fā)送消息的時(shí)候,消費(fèi)者恰好宕機(jī)了,但是過一會消費(fèi)者恢復(fù)了,但是消息卻沒收到。那也就是意味著消息隊(duì)列是臨時(shí)消息隊(duì)列。針對于這一點(diǎn),大家也可以測試一下,加深一下印象。
2.4. 分組解決重復(fù)消費(fèi)問題
原理: 微服務(wù)應(yīng)用放置于同一個(gè)group中,就能夠保證消息只會被其中一個(gè)應(yīng)用消費(fèi)一次。同一個(gè)組內(nèi)會發(fā)生競爭關(guān)系,只有其中一個(gè)可以消費(fèi)。
接下來直接調(diào)整兩個(gè)消費(fèi)者為同一個(gè)組:添加如下配置
當(dāng)兩個(gè)消費(fèi)者都設(shè)置好后啟動,會發(fā)現(xiàn)一個(gè)問題: 實(shí)際上分到一個(gè)組對于RabbitMQ來說就是兩個(gè)消費(fèi)者監(jiān)聽了一個(gè)隊(duì)列。
一個(gè)隊(duì)列那也就意味著,當(dāng)隊(duì)列收到一條消息,哪個(gè)消費(fèi)者誰先消費(fèi)就是誰的,消費(fèi)完隊(duì)列里面就沒有了,也就是只有一個(gè)消費(fèi)者能消費(fèi)到消息!
注意:假如不設(shè)置group屬性的時(shí)候,默認(rèn)是啟動一個(gè)消費(fèi)者,就會創(chuàng)建一個(gè)消費(fèi)隊(duì)列,啟動多個(gè)服務(wù)就會創(chuàng)建多個(gè)隊(duì)列。stream默認(rèn)使用的是RabbitMQ的topic交換機(jī)。當(dāng)發(fā)送者向這個(gè)交換機(jī)發(fā)送消息的時(shí)候,兩個(gè)隊(duì)列就都會接收到。關(guān)于RabbitMQ相關(guān)知識本篇不記錄,后續(xù)會專門寫RabbitMQ相關(guān)文章。
最終測試:8802/8803實(shí)現(xiàn)了輪詢分組,每次只有一個(gè)消費(fèi)者8801模塊的發(fā)的消息只能被8802或8803其中一個(gè)接收到,這樣避免了重復(fù)消費(fèi)。
2.5. 消息持久化
當(dāng)三個(gè)項(xiàng)目都啟動著的時(shí)候,現(xiàn)在我們要做幾件事:
停止8802和8803并去除掉8802的分組group: gxs
,8803不去分組信息,停止掉項(xiàng)目的時(shí)候會發(fā)現(xiàn)消息隊(duì)列并沒有刪除,說明一旦設(shè)置分組信息,消息隊(duì)列就不再是臨時(shí)隊(duì)列。
2.8801發(fā)送4條消息啟動8802然后消息并沒有打印,沒有收到消息(注意8802是去掉分組信息的)再啟動8803,有分組屬性配置,后臺打出來了MQ上的消息
原因就是:當(dāng)兩個(gè)項(xiàng)目都停止的時(shí)候,隊(duì)列并未刪除,而8803還綁定了這個(gè)隊(duì)列,所以他就算宕機(jī)了,又重啟了,依然可以收到消息。而8802沒有設(shè)置分組信息,他再啟動后系統(tǒng)會給他創(chuàng)建一個(gè)臨時(shí)隊(duì)列,自然而然收不到之前的消息了。
三、函數(shù)式編程練習(xí)
到此這篇關(guān)于Spring Cloud Stream詳解的文章就介紹到這了,更多相關(guān)Spring Cloud Stream內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java?多線程并發(fā)?ReentrantReadWriteLock詳情
這篇文章主要介紹了Java多線程并發(fā)ReentrantReadWriteLock詳情,ReentrantReadWriteLock可重入讀寫鎖。實(shí)際使用場景中,我們需要處理的操作本質(zhì)上是讀與寫,更多相關(guān)資料,感興趣的小伙伴可以參考一下下面文章內(nèi)容2022-06-06Mybatis resultType返回結(jié)果為null的問題排查方式
這篇文章主要介紹了Mybatis resultType返回結(jié)果為null的問題排查方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03關(guān)于slf4j_log4j2源碼學(xué)習(xí)心得
這篇文章主要介紹了slf4j_log4j2源碼學(xué)習(xí)心得,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12java數(shù)據(jù)庫連接池和數(shù)據(jù)庫連接示例
這篇文章主要介紹了java數(shù)據(jù)庫連接池和數(shù)據(jù)庫連接示例,需要的朋友可以參考下2014-05-05Mybatis-plus支持Gbase8s分頁的實(shí)現(xiàn)示例
本文主要介紹了Mybatis-plus支持Gbase8s分頁的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11java實(shí)現(xiàn)表單必填參數(shù)驗(yàn)證的方法
表單校驗(yàn)是很多注冊時(shí)必做的功能, 一般我們的處理都是很粗暴的寫個(gè)if()判斷, 然后拋異常. 本文將介紹通過代理的思想, 用注解優(yōu)雅的處理非空判斷,感興趣的一起來了解一下2021-05-05