一文快速掌握Spring?Cloud?Stream
本篇文章所涉及到的demo練習(xí) 使用的cloud 2021.0.3+ springboot2.6.8
一、概述簡(jiǎn)介
官網(wǎng):https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
1.1. cloud Stream是什么
官方定義:Spring Cloud Stream是一個(gè)用于構(gòu)建 與 共享消息系統(tǒng)
連接的高度可擴(kuò)展的事件驅(qū)動(dòng)微服務(wù)
。
目前主流的消息框架有:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
假設(shè)公司業(yè)務(wù)項(xiàng)目用了RabbitMQ,而大數(shù)據(jù)項(xiàng)目用了Kafka。這時(shí)候就會(huì)出現(xiàn)有兩個(gè)消息框架,相對(duì)于程序員來(lái)說(shuō)其實(shí)并不友好,還得兩個(gè)都掌握,正常對(duì)于一個(gè)程序員來(lái)說(shuō)熟練一個(gè)消息框架都不錯(cuò)了,何況還搞了兩個(gè),并且兩個(gè)維護(hù)起來(lái)也不好維護(hù)。
RabbitMQ和Kafka是兩個(gè)不同的框架,兩個(gè)消息模型上也存在著差異,并且代碼上用法也不一樣。Spring Cloud Stream就是不再關(guān)注具體MQ的細(xì)節(jié),可以在不改代碼的基礎(chǔ)上,來(lái)完成Rabbit和Kafka兩個(gè)不同的消息中間件的切換(這里的切換指的是原本用的RabbitMQ,但是用著用著發(fā)現(xiàn)kafka比較符合,所以想要換框架)。
總結(jié)成一句話:屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型
注意:遺憾的是目前僅支持RabbitMQ、Kafka
。
1.2. 設(shè)計(jì)思想
常規(guī)的MQ設(shè)計(jì)如下:
- Message:生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容
- MessageChannel:消息必須走特定的通道
- 隊(duì)列:假如發(fā)消息會(huì)先發(fā)到消息隊(duì)列當(dāng)中
- 消息隊(duì)列的消息如何被消費(fèi)呢:訂閱的人可以進(jìn)行消費(fèi)
cloud Stream設(shè)計(jì)如下:
通過(guò)定義綁定器Binder作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。
在沒(méi)有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會(huì)有較大的差異性,通過(guò)定義綁定器作為中間層,完美地實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。Stream對(duì)消息中間件的進(jìn)一步封裝,可以做到代碼層面對(duì)中間件的無(wú)感知,甚至于動(dòng)態(tài)的切換中間件(rabbitmq切換為kafka),使得微服務(wù)開(kāi)發(fā)的高度解耦,服務(wù)可以關(guān)注更多自己的業(yè)務(wù)流程
注意:左圖是官網(wǎng)的架構(gòu)圖
Binder可以生成Binding,Binding用來(lái)綁定消息容器的生產(chǎn)者和消費(fèi)者,它有兩種類(lèi)型,INPUT和OUTPUT,INPUT對(duì)應(yīng)于消費(fèi)者,OUTPUT對(duì)應(yīng)于生產(chǎn)者。
stream為了屏蔽差異,抽象出來(lái)了一個(gè)Binder層,而目前為止,只提供了兩個(gè)框架的實(shí)現(xiàn),通過(guò)具體的實(shí)現(xiàn)來(lái)連接消息中間件。
假如想要通過(guò)stream連接RabbitMQ就使用:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
假如想要通過(guò)stream連接Kafka就使用:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
Stream中的消息通信方式遵循了發(fā)布-訂閱模式
,Topic主題進(jìn)行廣播,在RabbitMQ就是Exchange
,在Kakfa中就是Topic
。
1.3. 標(biāo)準(zhǔn)流程
- Binder: 很方便的連接中間件,屏蔽差異
- Channel: 通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲(chǔ)和轉(zhuǎn)發(fā)的媒介,通過(guò)Channe對(duì)隊(duì)列進(jìn)行配置
- Source(源:發(fā)送者)和Sink(水槽:接受者): 簡(jiǎn)單的可理解為參照對(duì)象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。
1.4. 注解
注解完全是基于官方給的模型而定的!通過(guò)stream使用消息中間件也是非常簡(jiǎn)單的,直接使用以下注解就可以使用。
注意:注解依然是能用的,但是官方明確表示注解已經(jīng)被棄用,棄用并不是不能用,而是用了會(huì)畫(huà)橫杠不建議用。但是功能是沒(méi)有問(wèn)題的,低版本的cloud是沒(méi)有被棄用的。針對(duì)于注解和函數(shù)式編程兩種我都會(huì)進(jìn)行使用。
題外話:學(xué)技術(shù)永遠(yuǎn)是這樣,技術(shù)一直在不斷的更新迭代,真正學(xué)習(xí)一個(gè)技術(shù)并不是要掌握編碼使用,而是要掌握他到底是什么,能干什么,要去深入理解他,對(duì)于編碼,我認(rèn)為其實(shí)不是很重要。就算你今天掌握了官方最新用法,回頭人家又改寫(xiě)法了。
二、基于注解代碼練習(xí)
生產(chǎn)者就是消息發(fā)送者,消費(fèi)者就是消息接受者。這里我就不用kafka了,我直接用的是RabbitMQ。
windows下安裝RabbitMQ:https://blog.csdn.net/weixin_43888891/article/details/126514021
2.1. 消息驅(qū)動(dòng)之生產(chǎn)者
1.創(chuàng)建項(xiàng)目(可以是聚合可以是普通springboot項(xiàng)目)
2.添加pom
因?yàn)槭呛蚏abbitMQ整合,所以就是引入的spring-cloud-starter-stream-rabbit啟動(dòng)器
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <springboot.version>2.6.8</springboot.version> <springcloud.version>2021.0.3</springcloud.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${springboot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${springcloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>
3.添加application配置
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息; defaultRabbit: # 表示定義的名稱(chēng),用于于binding整合 type: rabbit # 消息組件類(lèi)型 environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務(wù)的整合處理 output: # 這個(gè)名字是一個(gè)通道的名稱(chēng) destination: studyExchange # 表示要使用的Exchange名稱(chēng)定義 content-type: application/json # 設(shè)置消息類(lèi)型,本次為json,文本則設(shè)置“text/plain” binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
4.添加接口
public interface IMessageProvider { public String send(); }
5.添加實(shí)現(xiàn)類(lèi)
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.UUID; // 可以理解為是一個(gè)消息的發(fā)送管道的定義 @EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { // 消息的發(fā)送管道 @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); // 創(chuàng)建并發(fā)送消息 this.output.send(MessageBuilder.withPayload(serial).build()); System.out.println("***serial: " + serial); return serial; } }
6.添加controller控制器
@RestController public class SendMessageController { @Autowired private IMessageProvider iMessageProvider; @GetMapping("send") public String send() { return iMessageProvider.send(); } }
7.測(cè)試
(1)首先要保證RabbitMQ是可以訪問(wèn)的:http://localhost:15672
(2)啟動(dòng)項(xiàng)目訪問(wèn):http://localhost:8801/send
下圖波峰代表發(fā)送消息成功
啟動(dòng)后會(huì)創(chuàng)建交換機(jī),名稱(chēng)就是application.yml當(dāng)中的destination屬性設(shè)置的
注意:停止服務(wù)后并沒(méi)有刪除交換機(jī)?。?!
2.2. 消息驅(qū)動(dòng)之消費(fèi)者
1.創(chuàng)建項(xiàng)目
2.添加pom(pom和發(fā)送者依賴一模一樣)
3.添加application配置
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息; defaultRabbit: # 表示定義的名稱(chēng),用于于binding整合 type: rabbit # 消息組件類(lèi)型 environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務(wù)的整合處理 input: # 這個(gè)名字是一個(gè)通道的名稱(chēng) destination: studyExchange # 表示要使用的Exchange名稱(chēng)定義 content-type: application/json # 設(shè)置消息類(lèi)型,本次為對(duì)象json,如果是文本則設(shè)置“text/plain” binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
4.添加監(jiān)聽(tīng)(消費(fèi)者只負(fù)責(zé)接受消息)
import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消費(fèi)者1號(hào),------->接收到的消息:" + message.getPayload() + "\t port: " + serverPort); } }
5.測(cè)試
(1)啟動(dòng)RabbitMQ
(2)啟動(dòng)發(fā)送消息端服務(wù)
(3)啟動(dòng)消費(fèi)者服務(wù),啟動(dòng)后會(huì)發(fā)現(xiàn),他自動(dòng)會(huì)向這個(gè)交換機(jī)當(dāng)中添加一個(gè)隊(duì)列。
發(fā)送消息:http://localhost:8801/send
接受消息:
注意:當(dāng)停止服務(wù)后消息隊(duì)列會(huì)被自動(dòng)刪除!??!
2.3. 目前存在的問(wèn)題
1.依照8802, clone出來(lái)一份運(yùn)行8803,主要用來(lái)演示多個(gè)消費(fèi)者的場(chǎng)景
2.啟動(dòng)8801生產(chǎn)者
3.啟動(dòng)8802消費(fèi)者
4.啟動(dòng)8803消費(fèi)者
當(dāng)三個(gè)服務(wù)都啟動(dòng)后通過(guò)RabbitMQ界面會(huì)發(fā)現(xiàn),一個(gè)交換機(jī)綁定了兩個(gè)隊(duì)列
運(yùn)行后會(huì)發(fā)現(xiàn)存在兩個(gè)問(wèn)題:
有重復(fù)消費(fèi)問(wèn)題消息持久化問(wèn)題
(1)重復(fù)消費(fèi)問(wèn)題:
發(fā)送消息后兩個(gè)消費(fèi)者都收到了消息:http://localhost:8801/send
比如在如下場(chǎng)景中,訂單系統(tǒng)我們做集群部署,都會(huì)從RabbitMQ中獲取訂單信息,那如果一個(gè)訂單同時(shí)被兩個(gè)服務(wù)獲取到,那么就會(huì)造成數(shù)據(jù)錯(cuò)誤,我們得避免這種情況。這時(shí)我們就可以使用Stream中的消息分組來(lái)解決
注意在Stream中處于同一個(gè)group中的多個(gè)消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。不同組是可以全面消費(fèi)的(重復(fù)消費(fèi)),同一組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi)。
(2)消息持久化問(wèn)題:
當(dāng)生產(chǎn)者發(fā)送消息的時(shí)候,消費(fèi)者恰好宕機(jī)了,但是過(guò)一會(huì)消費(fèi)者恢復(fù)了,但是消息卻沒(méi)收到。那也就是意味著消息隊(duì)列是臨時(shí)消息隊(duì)列。針對(duì)于這一點(diǎn),大家也可以測(cè)試一下,加深一下印象。
2.4. 分組解決重復(fù)消費(fèi)問(wèn)題
原理: 微服務(wù)應(yīng)用放置于同一個(gè)group中,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。同一個(gè)組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi)。
接下來(lái)直接調(diào)整兩個(gè)消費(fèi)者為同一個(gè)組:添加如下配置
當(dāng)兩個(gè)消費(fèi)者都設(shè)置好后啟動(dòng),會(huì)發(fā)現(xiàn)一個(gè)問(wèn)題: 實(shí)際上分到一個(gè)組對(duì)于RabbitMQ來(lái)說(shuō)就是兩個(gè)消費(fèi)者監(jiān)聽(tīng)了一個(gè)隊(duì)列。
一個(gè)隊(duì)列那也就意味著,當(dāng)隊(duì)列收到一條消息,哪個(gè)消費(fèi)者誰(shuí)先消費(fèi)就是誰(shuí)的,消費(fèi)完隊(duì)列里面就沒(méi)有了,也就是只有一個(gè)消費(fèi)者能消費(fèi)到消息!
注意:假如不設(shè)置group屬性的時(shí)候,默認(rèn)是啟動(dòng)一個(gè)消費(fèi)者,就會(huì)創(chuàng)建一個(gè)消費(fèi)隊(duì)列,啟動(dòng)多個(gè)服務(wù)就會(huì)創(chuàng)建多個(gè)隊(duì)列。stream默認(rèn)使用的是RabbitMQ的topic交換機(jī)。當(dāng)發(fā)送者向這個(gè)交換機(jī)發(fā)送消息的時(shí)候,兩個(gè)隊(duì)列就都會(huì)接收到。關(guān)于RabbitMQ相關(guān)知識(shí)本篇不記錄,后續(xù)會(huì)專(zhuān)門(mén)寫(xiě)RabbitMQ相關(guān)文章。
最終測(cè)試:8802/8803實(shí)現(xiàn)了輪詢分組,每次只有一個(gè)消費(fèi)者8801模塊的發(fā)的消息只能被8802或8803其中一個(gè)接收到,這樣避免了重復(fù)消費(fèi)。
2.5. 消息持久化
當(dāng)三個(gè)項(xiàng)目都啟動(dòng)著的時(shí)候,現(xiàn)在我們要做幾件事:
停止8802和8803并去除掉8802的分組group: gxs
,8803不去分組信息,停止掉項(xiàng)目的時(shí)候會(huì)發(fā)現(xiàn)消息隊(duì)列并沒(méi)有刪除,說(shuō)明一旦設(shè)置分組信息,消息隊(duì)列就不再是臨時(shí)隊(duì)列。
2.8801發(fā)送4條消息啟動(dòng)8802然后消息并沒(méi)有打印,沒(méi)有收到消息(注意8802是去掉分組信息的)再啟動(dòng)8803,有分組屬性配置,后臺(tái)打出來(lái)了MQ上的消息
原因就是:當(dāng)兩個(gè)項(xiàng)目都停止的時(shí)候,隊(duì)列并未刪除,而8803還綁定了這個(gè)隊(duì)列,所以他就算宕機(jī)了,又重啟了,依然可以收到消息。而8802沒(méi)有設(shè)置分組信息,他再啟動(dòng)后系統(tǒng)會(huì)給他創(chuàng)建一個(gè)臨時(shí)隊(duì)列,自然而然收不到之前的消息了。
三、函數(shù)式編程練習(xí)
到此這篇關(guān)于Spring Cloud Stream詳解的文章就介紹到這了,更多相關(guān)Spring Cloud Stream內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java?多線程并發(fā)?ReentrantReadWriteLock詳情
這篇文章主要介紹了Java多線程并發(fā)ReentrantReadWriteLock詳情,ReentrantReadWriteLock可重入讀寫(xiě)鎖。實(shí)際使用場(chǎng)景中,我們需要處理的操作本質(zhì)上是讀與寫(xiě),更多相關(guān)資料,感興趣的小伙伴可以參考一下下面文章內(nèi)容2022-06-06Mybatis resultType返回結(jié)果為null的問(wèn)題排查方式
這篇文章主要介紹了Mybatis resultType返回結(jié)果為null的問(wèn)題排查方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03關(guān)于slf4j_log4j2源碼學(xué)習(xí)心得
這篇文章主要介紹了slf4j_log4j2源碼學(xué)習(xí)心得,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12java數(shù)據(jù)庫(kù)連接池和數(shù)據(jù)庫(kù)連接示例
這篇文章主要介紹了java數(shù)據(jù)庫(kù)連接池和數(shù)據(jù)庫(kù)連接示例,需要的朋友可以參考下2014-05-05Java開(kāi)發(fā)中的容器概念、分類(lèi)與用法深入詳解
這篇文章主要介紹了Java開(kāi)發(fā)中的容器概念、分類(lèi)與用法,結(jié)合實(shí)例形式較為詳細(xì)的分析了java容器的相關(guān)概念、分類(lèi)、使用方法與注意事項(xiàng),需要的朋友可以參考下2017-11-11Mybatis-plus支持Gbase8s分頁(yè)的實(shí)現(xiàn)示例
本文主要介紹了Mybatis-plus支持Gbase8s分頁(yè)的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11java實(shí)現(xiàn)表單必填參數(shù)驗(yàn)證的方法
表單校驗(yàn)是很多注冊(cè)時(shí)必做的功能, 一般我們的處理都是很粗暴的寫(xiě)個(gè)if()判斷, 然后拋異常. 本文將介紹通過(guò)代理的思想, 用注解優(yōu)雅的處理非空判斷,感興趣的一起來(lái)了解一下2021-05-05