Springcloud Stream消息驅(qū)動工具使用介紹
springcloud Stream
什么是springcloud Stream
現(xiàn)在市面上有很多的消息中間件,每一個公司使用的都有所不同,為了減少學(xué)習(xí)的成本,springcloud Stream可以讓我們不再關(guān)注消息中間件MQ的具體細(xì)節(jié),我們只需要通過適配綁定的方式即可實現(xiàn)不同MQ之間的切換,但是遺憾的是springcloud Stream目前只支持RabbitMQ和Kafka。
SpringCloud Stream是一個構(gòu)建消息驅(qū)動微服務(wù)的框架,應(yīng)用程序通過inputs或者 outputs來與SpringCloud Stream中的binder進(jìn)行交互,我們可以通過配置來binding ,而 SpringCloud Stream 的binder負(fù)責(zé)與中間件交互,所以我們只需要搞清楚如何與Stream交互就可以很方便的使用消息驅(qū)動了!
什么是Binder
Binder是SpringCloud Stream的一個抽象概念,是應(yīng)用與消息中間件之間的粘合劑,通過定義綁定器Binder作為中間層,實現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離,可以動態(tài)的改變消息的destinations(對應(yīng)于 Kafka的topic,RabbitMQ的exchanges),這些都可以通過外部配置項來做到,甚至可以任意的改變中間件的類型但是不需要修改一行代碼
為什么使用Stream
比方說我們用到了RabbitMQ和Kafka,由于這兩個消息中間件的架構(gòu)上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū),這些中間件的差異性導(dǎo)致我們實際項目開發(fā)給我們造成了一定的困擾,我們?nèi)绻昧藘蓚€消息隊列的其中一種,后面的業(yè)務(wù)需求,我想往另外一種消息隊列進(jìn)行遷移;這時候無疑就是一個災(zāi)難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統(tǒng)耦合了,這襯候springcloud Stream給我們提供了一種解耦合的方式。
Stream使用案例
前置知識
Stream處理消息的架構(gòu)


Source、Sink: 簡單的可理解為參照對象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。Channel: 通道,是隊列Queue的一種抽象,在消息通訊系統(tǒng)中就是實現(xiàn)存儲和轉(zhuǎn)發(fā)的媒介。Binder: 消息的生產(chǎn)者和消費者中間層,實現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離
通過以上兩張圖片可知,消息的處理流向是:消息生產(chǎn)者處理完業(yè)務(wù)邏輯之后消息到達(dá)source中,接著前往Channel通道進(jìn)行排隊,然后通過binder綁定器將消息數(shù)據(jù)發(fā)送到底層mq,然后又通過binder綁定器接收到底層mq發(fā)送來的消息數(shù)據(jù),接著前往Channel通道進(jìn)行排隊,由Sink接收到消息數(shù)據(jù),消息消費者拿到消息數(shù)據(jù)執(zhí)行相應(yīng)的業(yè)務(wù)邏輯
Stream常用注解

消息生產(chǎn)者8801模塊搭建
第一步: 創(chuàng)建一個maven模塊,引入相關(guān)依賴,最主要的就是stream整合rabbitmq的依賴
<!--stream的rabbitmq依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>第二步: 配置文件的編寫
server:
port: 8801spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置eureka:
client: # 客戶端進(jìn)行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
第三步: 主程序類
@SpringBootApplication
public class CloudStreamRabbitmqProvider8801Application {
public static void main(String[] args) {
SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
System.out.println("啟動成功");
}
}第四步: 業(yè)務(wù)層service代碼編寫,注意:這里實現(xiàn)類注入的對象由之前的dao層對象換成了channel通道對象,詳細(xì)的發(fā)送由實現(xiàn)類的第12完成
public interface IMessageProviderService {
/**
* 定義消息的推送管道
*
* @return
*/
String send();
}
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
/**
* 消息發(fā)送管道/信道
*/
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: " + serial);
return serial;
}
}
第五步: controller接口
@RestController
public class SendMessageController {
@Resource
private IMessageProviderService messageProviderService;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProviderService.send();
}
}消息消費者8802模塊搭建
第一步: 創(chuàng)建一個maven模塊,引入相關(guān)依賴,最主要的就是stream整合rabbitmq的依賴
<!--stream的rabbitmq依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
第二步: 配置文件的編寫,與生產(chǎn)者的區(qū)別就在于bindings下的是input而不是output
server:
port: 8802spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
eureka:
client: # 客戶端進(jìn)行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
第三步: 主程序類
@SpringBootApplication
public class CloudStreamRabbitmqConsumer8802Application {
public static void main(String[] args) {
SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
System.out.println("啟動成功");
}
}第四步: controller接口,使用url請求生產(chǎn)者8801,即可在消費者8802端接收到8801發(fā)送的消息
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消費者1號 ----> port:" + serverPort + "\t從8801接受到的消息是:" + message.getPayload());
}
}
兩個模塊搭建完成進(jìn)行測試,首先啟動注冊中心7001,然后分別啟動消息生產(chǎn)者8801和消息消費者8802,通過url請求訪問8001的發(fā)送消息請求,會向指定管道中發(fā)送一條消息,如果此時這個管道中有消費者即可接收到這條消息。而如何指定消息的管道歸屬呢,就是通過配置文件中的indings.input.destination來指定,命名相同的服務(wù)就會處在同一條管道中
Stream帶來的問題
重復(fù)消費問題
按照之前的使用,會帶來重復(fù)消費問題: 也就是說一個通道上有不止一個消息消費者,stream上默認(rèn)每一個消費者都屬于不同的組,這樣的話就會導(dǎo)致這個消息被多個組的消費者重復(fù)消費
知道了問題出現(xiàn)的原因就很容易解決了,只要我們自定義配置分組,將這些消費者都分配到同一個組中就能避免重復(fù)消費的問題出現(xiàn)了(同一個組間的消費者是競爭關(guān)系,不管組間有多少的消費者都只會消費一次)
自定義分組
只需要在配置文件修改一處配置即可實現(xiàn)自定義組名并且自定義分組,組名相同的服務(wù)會被分配到同一組,通道內(nèi)的消息數(shù)據(jù)會被該組中的所有消費者輪詢消費

持久化問題
上面自定義分組使用的group配置除了可以自定義分組和分組名之外,還可以實現(xiàn)消息的持久化,也就是說使用group配置自定義分組和分組名的消息消費者,就算在消息生產(chǎn)者發(fā)送消息的時候掛掉了,等這個消費者重啟之后依然是能夠消費之前發(fā)送的消息
這里一個生產(chǎn)者和兩個消費者存在以下十三種情況(生產(chǎn)者發(fā)送四次消息):
1、都使用group分組的兩個不同組成員,在生產(chǎn)者生產(chǎn)的時候
- 都沒掛(各消費四次)
- 掛了其中一個(各消費四次)
- 都掛了(各消費四次)
2、都使用group分組的兩個同組成員,在生產(chǎn)者生產(chǎn)的時候
- 都沒掛(各消費兩次)
- 掛了其中一個(沒掛的把四次消費完)
- 都掛了(各消費兩次)
3、其中一個使用group分組的兩個成員,在生產(chǎn)者生產(chǎn)的時候
- 都掛了(都不消費)
- group的掛了(各消費四次)
- 沒group的掛了(沒掛的消費四次,掛的由于沒有持久化所以不消費)
- 都沒掛(各消費四次)
4、都不使用group分組的兩個成員,在生產(chǎn)者生產(chǎn)的時候
- 都掛了(都不消費)
- 掛了其中一個(沒掛的消費四次,掛的由于沒有持久化所以不消費)
- 都沒掛(各消費四次)
總之一句話,通道里的消息會持久化給使用group配置的消息消費者(每一組都有一份),就算發(fā)送消息的時候這些消費者掛了,如果同組的消費者有沒掛的就會把這些消息競爭消費完;如果同組沒有消費者,等他重啟之后還是會消費這些消息
到此這篇關(guān)于Springcloud Stream消息驅(qū)動工具使用介紹的文章就介紹到這了,更多相關(guān)Springcloud Stream內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot自動配置原理以及spring.factories文件的作用詳解
這篇文章主要介紹了springboot自動配置原理以及spring.factories文件的作用詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
JPA如何設(shè)置表名和實體名,表字段與實體字段的對應(yīng)
這篇文章主要介紹了JPA如何設(shè)置表名和實體名,表字段與實體字段的對應(yīng),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11
list轉(zhuǎn)tree和list中查找某節(jié)點下的所有數(shù)據(jù)操作
這篇文章主要介紹了list轉(zhuǎn)tree和list中查找某節(jié)點下的所有數(shù)據(jù)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09
Elasticsearch索引結(jié)構(gòu)與算法解析
?作為搜索引擎的一部分,ES自然具有速度快、結(jié)果準(zhǔn)確、結(jié)果豐富等特點,那么ES是如何達(dá)到“搜索引擎”級別的查詢效率呢?首先是索引,其次是壓縮算法,接下來我們就一起了解下ES的索引結(jié)構(gòu)和壓縮算法2023-04-04

