Springcloud Stream消息驅(qū)動(dòng)工具使用介紹
springcloud Stream
什么是springcloud Stream
現(xiàn)在市面上有很多的消息中間件,每一個(gè)公司使用的都有所不同,為了減少學(xué)習(xí)的成本,springcloud Stream可以讓我們不再關(guān)注消息中間件MQ的具體細(xì)節(jié),我們只需要通過適配綁定的方式即可實(shí)現(xiàn)不同MQ之間的切換,但是遺憾的是springcloud Stream目前只支持RabbitMQ和Kafka。
SpringCloud Stream是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架,應(yīng)用程序通過inputs或者 outputs來與SpringCloud Stream中的binder進(jìn)行交互,我們可以通過配置來binding ,而 SpringCloud Stream 的binder負(fù)責(zé)與中間件交互,所以我們只需要搞清楚如何與Stream交互就可以很方便的使用消息驅(qū)動(dòng)了!
什么是Binder
Binder是SpringCloud Stream的一個(gè)抽象概念,是應(yīng)用與消息中間件之間的粘合劑,通過定義綁定器Binder作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離,可以動(dòng)態(tài)的改變消息的destinations(對(duì)應(yīng)于 Kafka的topic,RabbitMQ的exchanges),這些都可以通過外部配置項(xiàng)來做到,甚至可以任意的改變中間件的類型但是不需要修改一行代碼
為什么使用Stream
比方說我們用到了RabbitMQ和Kafka,由于這兩個(gè)消息中間件的架構(gòu)上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū),這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開發(fā)給我們?cè)斐闪艘欢ǖ睦_,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種,后面的業(yè)務(wù)需求,我想往另外一種消息隊(duì)列進(jìn)行遷移;這時(shí)候無疑就是一個(gè)災(zāi)難性的,一大堆東西都要重新推倒重新做,因?yàn)樗覀兊南到y(tǒng)耦合了,這襯候springcloud Stream給我們提供了一種解耦合的方式。
Stream使用案例
前置知識(shí)
Stream處理消息的架構(gòu)
Source、Sink: 簡(jiǎn)單的可理解為參照對(duì)象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。Channel: 通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲(chǔ)和轉(zhuǎn)發(fā)的媒介。Binder: 消息的生產(chǎn)者和消費(fèi)者中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離
通過以上兩張圖片可知,消息的處理流向是:消息生產(chǎn)者處理完業(yè)務(wù)邏輯之后消息到達(dá)source中,接著前往Channel通道進(jìn)行排隊(duì),然后通過binder綁定器將消息數(shù)據(jù)發(fā)送到底層mq,然后又通過binder綁定器接收到底層mq發(fā)送來的消息數(shù)據(jù),接著前往Channel通道進(jìn)行排隊(duì),由Sink接收到消息數(shù)據(jù),消息消費(fèi)者拿到消息數(shù)據(jù)執(zhí)行相應(yīng)的業(yè)務(wù)邏輯
Stream常用注解
消息生產(chǎn)者8801模塊搭建
第一步: 創(chuàng)建一個(gè)maven模塊,引入相關(guān)依賴,最主要的就是stream整合rabbitmq的依賴
<!--stream的rabbitmq依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
第二步: 配置文件的編寫
server:
port: 8801spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
output: # 這個(gè)名字是一個(gè)通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置eureka:
client: # 客戶端進(jìn)行Eureka注冊(cè)的配置
service-url:
defaultZone: http://localhost:7001/eureka
第三步: 主程序類
@SpringBootApplication public class CloudStreamRabbitmqProvider8801Application { public static void main(String[] args) { SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args); System.out.println("啟動(dòng)成功"); } }
第四步: 業(yè)務(wù)層service代碼編寫,注意:這里實(shí)現(xiàn)類注入的對(duì)象由之前的dao層對(duì)象換成了channel通道對(duì)象,詳細(xì)的發(fā)送由實(shí)現(xiàn)類的第12完成
public interface IMessageProviderService { /** * 定義消息的推送管道 * * @return */ String send(); }
@EnableBinding(Source.class) public class MessageProviderServiceImpl implements IMessageProviderService { /** * 消息發(fā)送管道/信道 */ @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*****serial: " + serial); return serial; } }
第五步: controller接口
@RestController public class SendMessageController { @Resource private IMessageProviderService messageProviderService; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProviderService.send(); } }
消息消費(fèi)者8802模塊搭建
第一步: 創(chuàng)建一個(gè)maven模塊,引入相關(guān)依賴,最主要的就是stream整合rabbitmq的依賴
<!--stream的rabbitmq依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
第二步: 配置文件的編寫,與生產(chǎn)者的區(qū)別就在于bindings下的是input而不是output
server:
port: 8802spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
input: # 這個(gè)名字是一個(gè)通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
eureka:
client: # 客戶端進(jìn)行Eureka注冊(cè)的配置
service-url:
defaultZone: http://localhost:7001/eureka
第三步: 主程序類
@SpringBootApplication public class CloudStreamRabbitmqConsumer8802Application { public static void main(String[] args) { SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args); System.out.println("啟動(dòng)成功"); } }
第四步: controller接口,使用url請(qǐng)求生產(chǎn)者8801,即可在消費(fèi)者8802端接收到8801發(fā)送的消息
@Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消費(fèi)者1號(hào) ----> port:" + serverPort + "\t從8801接受到的消息是:" + message.getPayload()); } }
兩個(gè)模塊搭建完成進(jìn)行測(cè)試,首先啟動(dòng)注冊(cè)中心7001,然后分別啟動(dòng)消息生產(chǎn)者8801和消息消費(fèi)者8802,通過url請(qǐng)求訪問8001的發(fā)送消息請(qǐng)求,會(huì)向指定管道中發(fā)送一條消息,如果此時(shí)這個(gè)管道中有消費(fèi)者即可接收到這條消息。而如何指定消息的管道歸屬呢,就是通過配置文件中的indings.input.destination來指定,命名相同的服務(wù)就會(huì)處在同一條管道中
Stream帶來的問題
重復(fù)消費(fèi)問題
按照之前的使用,會(huì)帶來重復(fù)消費(fèi)問題: 也就是說一個(gè)通道上有不止一個(gè)消息消費(fèi)者,stream上默認(rèn)每一個(gè)消費(fèi)者都屬于不同的組,這樣的話就會(huì)導(dǎo)致這個(gè)消息被多個(gè)組的消費(fèi)者重復(fù)消費(fèi)
知道了問題出現(xiàn)的原因就很容易解決了,只要我們自定義配置分組,將這些消費(fèi)者都分配到同一個(gè)組中就能避免重復(fù)消費(fèi)的問題出現(xiàn)了(同一個(gè)組間的消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,不管組間有多少的消費(fèi)者都只會(huì)消費(fèi)一次)
自定義分組
只需要在配置文件修改一處配置即可實(shí)現(xiàn)自定義組名并且自定義分組,組名相同的服務(wù)會(huì)被分配到同一組,通道內(nèi)的消息數(shù)據(jù)會(huì)被該組中的所有消費(fèi)者輪詢消費(fèi)
持久化問題
上面自定義分組使用的group配置除了可以自定義分組和分組名之外,還可以實(shí)現(xiàn)消息的持久化,也就是說使用group配置自定義分組和分組名的消息消費(fèi)者,就算在消息生產(chǎn)者發(fā)送消息的時(shí)候掛掉了,等這個(gè)消費(fèi)者重啟之后依然是能夠消費(fèi)之前發(fā)送的消息
這里一個(gè)生產(chǎn)者和兩個(gè)消費(fèi)者存在以下十三種情況(生產(chǎn)者發(fā)送四次消息):
1、都使用group分組的兩個(gè)不同組成員,在生產(chǎn)者生產(chǎn)的時(shí)候
- 都沒掛(各消費(fèi)四次)
- 掛了其中一個(gè)(各消費(fèi)四次)
- 都掛了(各消費(fèi)四次)
2、都使用group分組的兩個(gè)同組成員,在生產(chǎn)者生產(chǎn)的時(shí)候
- 都沒掛(各消費(fèi)兩次)
- 掛了其中一個(gè)(沒掛的把四次消費(fèi)完)
- 都掛了(各消費(fèi)兩次)
3、其中一個(gè)使用group分組的兩個(gè)成員,在生產(chǎn)者生產(chǎn)的時(shí)候
- 都掛了(都不消費(fèi))
- group的掛了(各消費(fèi)四次)
- 沒group的掛了(沒掛的消費(fèi)四次,掛的由于沒有持久化所以不消費(fèi))
- 都沒掛(各消費(fèi)四次)
4、都不使用group分組的兩個(gè)成員,在生產(chǎn)者生產(chǎn)的時(shí)候
- 都掛了(都不消費(fèi))
- 掛了其中一個(gè)(沒掛的消費(fèi)四次,掛的由于沒有持久化所以不消費(fèi))
- 都沒掛(各消費(fèi)四次)
總之一句話,通道里的消息會(huì)持久化給使用group配置的消息消費(fèi)者(每一組都有一份),就算發(fā)送消息的時(shí)候這些消費(fèi)者掛了,如果同組的消費(fèi)者有沒掛的就會(huì)把這些消息競(jìng)爭(zhēng)消費(fèi)完;如果同組沒有消費(fèi)者,等他重啟之后還是會(huì)消費(fèi)這些消息
到此這篇關(guān)于Springcloud Stream消息驅(qū)動(dòng)工具使用介紹的文章就介紹到這了,更多相關(guān)Springcloud Stream內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot自動(dòng)配置原理以及spring.factories文件的作用詳解
這篇文章主要介紹了springboot自動(dòng)配置原理以及spring.factories文件的作用詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10JPA如何設(shè)置表名和實(shí)體名,表字段與實(shí)體字段的對(duì)應(yīng)
這篇文章主要介紹了JPA如何設(shè)置表名和實(shí)體名,表字段與實(shí)體字段的對(duì)應(yīng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11list轉(zhuǎn)tree和list中查找某節(jié)點(diǎn)下的所有數(shù)據(jù)操作
這篇文章主要介紹了list轉(zhuǎn)tree和list中查找某節(jié)點(diǎn)下的所有數(shù)據(jù)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09SpringBoot接口加密與解密的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot接口加密與解密的實(shí)現(xiàn)2023-10-10Elasticsearch索引結(jié)構(gòu)與算法解析
?作為搜索引擎的一部分,ES自然具有速度快、結(jié)果準(zhǔn)確、結(jié)果豐富等特點(diǎn),那么ES是如何達(dá)到“搜索引擎”級(jí)別的查詢效率呢?首先是索引,其次是壓縮算法,接下來我們就一起了解下ES的索引結(jié)構(gòu)和壓縮算法2023-04-04