欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringCloud Stream消息驅(qū)動(dòng)實(shí)例詳解

 更新時(shí)間:2021年03月05日 08:43:30   作者:MPolaris  
這篇文章主要介紹了SpringCloud Stream消息驅(qū)動(dòng)的相關(guān)知識(shí),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

1. 消息驅(qū)動(dòng)概述

1.1 是什么

在實(shí)際應(yīng)用中有很多消息中間件,比如現(xiàn)在企業(yè)里常用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,學(xué)習(xí)所有這些消息中間件無(wú)疑需要大量時(shí)間經(jīng)歷成本,那有沒(méi)有一種技術(shù),使我們不再需要關(guān)注具體的消息中間件的細(xì)節(jié),而只需要用一種適配綁定的方式,自動(dòng)的在各種消息中間件內(nèi)切換呢?消息驅(qū)動(dòng)就是這樣的技術(shù),它能 屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型。

SpringCloud Stream是一個(gè)構(gòu)件消息驅(qū)動(dòng)微服務(wù)的框架。應(yīng)用程序通過(guò)inputs和outputs來(lái)與SpringCloud Stream中的綁定器(binder)對(duì)象交互,通過(guò)配置來(lái)綁定,而SpringCloud Stream的綁定器對(duì)象負(fù)責(zé)與消息中間件交互,所以,我們只需要搞清楚如何與SpringCloud Stream交互就可以方便使用消息驅(qū)動(dòng)的方式。但是 截至到目前 SpringCloud Stream僅支持RabbitMQ和Kafka。

1.2 設(shè)計(jì)思想

標(biāo)準(zhǔn)MQ模型

  • 生產(chǎn)者 / 消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容 - Messag
  • 消息必須走特定的通道 - Message Channel
  • 消息通道里的消息如何被消費(fèi)呢?誰(shuí)負(fù)責(zé)處理? - 消息通道 MessageChannel 的子接口 SubscribableChannel,由 MessageHandler 消息處理器所訂閱

為什么使用Cloud Stream

比如說(shuō)我們用到了RabbitMQ和Kafka,由于這兩個(gè)消息中間件的架構(gòu)上的不同,像RabbitMQ有exchange,Kafka有Topic和Partitions分區(qū),這些中間件的差異性導(dǎo)致實(shí)際項(xiàng)目開(kāi)發(fā)給我們?cè)斐闪艘欢ǖ睦_,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種,后面的業(yè)務(wù)需求如果又要往另外一種消息隊(duì)列進(jìn)行遷移,這無(wú)疑是一個(gè)災(zāi)難,一大堆東西都要重新推到重做,因?yàn)樗覀兊南到y(tǒng)耦合了,這時(shí)候SpringCloud Stream給我們提供了一種解耦合的方式。

stream憑什么可以統(tǒng)一底層差異

在沒(méi)有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會(huì)有較大的差異性。

通過(guò)定義綁定器作為中間層,完美的實(shí)現(xiàn)了 應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。Stream對(duì)消息中間件的進(jìn)一步封裝(通過(guò)向應(yīng)用程序暴露統(tǒng)一的Channel通道,使得應(yīng)用程序不需要再考慮各種不同的消息中間件實(shí)現(xiàn)),可以做到代碼層面對(duì)中間件的無(wú)感知,甚至于動(dòng)態(tài)的切換中間件(如RabbitMQ切換為Kafka),使得微服務(wù)開(kāi)發(fā)的高度解耦,服務(wù)可以更多的關(guān)注自己的業(yè)務(wù)流程。

在消息綁定器中,INPUT對(duì)應(yīng)于消費(fèi)者,OUTPUT對(duì)應(yīng)于生產(chǎn)者。

Stream中的消息通信方式遵循了 發(fā)布-訂閱模式,用Topic(主題)進(jìn)行廣播(RabbitMQ中對(duì)應(yīng)于Exchange交換機(jī),Kafka中就是Topic)。

1.3 SpringCloud Stream標(biāo)準(zhǔn)流程套路

  • Binder 很方便的連接中間件,屏蔽差異
  • Channel 通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)了存儲(chǔ)和轉(zhuǎn)發(fā)的媒介,通過(guò)Channel對(duì)隊(duì)列進(jìn)行配置
  • SourceSink 簡(jiǎn)單的可以理解為參照對(duì)象是SpringCloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入

1.4 SpringCloud Stream編碼API與常用注解

組成 說(shuō)明
Middleware 中間件,目前只支持RabbitMQ和Kafka
Binder Binder是應(yīng)用與消息中間件之間的封裝,目前實(shí)行了RabbitMQ和Kafka的Binder,通過(guò)Binder可以很方便的連接中間件,可以動(dòng)態(tài)的改變消息類型(對(duì)應(yīng)于Kafka的topic,RabbitMQ的exchange),這些都可以通過(guò)配置文件來(lái)實(shí)現(xiàn)
@Input 注解標(biāo)識(shí)輸入通道,通過(guò)該輸入通道接收到的消息進(jìn)入應(yīng)用程序
@Output 注解標(biāo)識(shí)輸出通道,發(fā)布的消息將通過(guò)該通道離開(kāi)應(yīng)用程序
@StreamListner 監(jiān)聽(tīng)隊(duì)列,用于消費(fèi)者的隊(duì)列的消息接收
@EnableBinding 使信道Channel和交換機(jī)/主題(Exchange/Topic)綁定在一起

2. Spring Cloud Stream 案例

新建三個(gè)子模塊分別對(duì)應(yīng)于消息的生產(chǎn)者和消費(fèi)者:

模塊名 微服務(wù)功能
cloud-stream-rabbitmq-provider8801 生產(chǎn)者,發(fā)送消息模塊
cloud-stream-rabbitmq-consumer8802 消費(fèi)者,接收消息模塊
cloud-stream-rabbitmq-consumer8803 消費(fèi)者,接收消息模塊

2.1 消息驅(qū)動(dòng)之消息生產(chǎn)者

新建Module:cloud-stream-rabbitmq-provider8801作為消息的生產(chǎn)者用來(lái)發(fā)送消息,在其POM文件中除引入web、actuator、eureka-client等必要啟動(dòng)器外,還需要引入SpringCloud Stream對(duì)應(yīng)實(shí)現(xiàn)RabbitMQ的啟動(dòng)器依賴:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

編寫其配置文件application.yml:

server:
 port: 8801

spring:
 application:
 name: cloud-stream-provider
 cloud:
 stream:
  binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息
  defaultRabbit: # 表示定義的名稱,用于于binding整合
   type: rabbit # 消息組件類型
   environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
   spring:
    rabbitmq:
    host: mpolaris.top
    port: 5672
    username: admin
    password: 1234321
  bindings: # 服務(wù)的整合處理
  output: # 這個(gè)名字是一個(gè)通道的名稱,OUTPUT表示這是消息的發(fā)送方
   # 表示要使用的Exchange名稱定義
   destination: testExchange 
   # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”
   content-type: application/json 
   # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
   default-binder: defaultRabbit

eureka:
 client: # 客戶端進(jìn)行Eureka注冊(cè)的配置
 service-url:
  defaultZone: http://eureka7001.com:7001/eureka
 instance:
 # 設(shè)置心跳的時(shí)間間隔(默認(rèn)是30秒)
 lease-renewal-interval-in-seconds: 2 
 # 如果現(xiàn)在超過(guò)了5秒的間隔(默認(rèn)是90秒)
 lease-expiration-duration-in-seconds: 5 
 # 在信息列表時(shí)顯示主機(jī)名稱yml
 instance-id: send-8801.com 
 # 訪問(wèn)的路徑變?yōu)镮P地址
 prefer-ip-address: true  

編寫其主啟動(dòng)類

編寫業(yè)務(wù)類,在業(yè)務(wù)類中分別要編寫 發(fā)送消息接口 及其 實(shí)現(xiàn)類,并在發(fā)送接口消息的實(shí)現(xiàn)類中 添加 @EnableBinding 注解 用來(lái)綁定消息的推送管道,消息生產(chǎn)者綁定的消息推送管道為 org.springframework.cloud.stream.messaging.Source

public interface IMessageProvider {
 public String send();
}
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @Author polaris
 * @Date 2021/3/4 21:46
 */
@EnableBinding(Source.class) //定義消息的推送管道
public class MessageProviderImpl implements IMessageProvider {

 @Resource
 private MessageChannel output; //消息發(fā)送管道

 @Override
 public String send() {
  String serial = UUID.randomUUID().toString();
  output.send(MessageBuilder.withPayload(serial).build()); //發(fā)送消息
  System.out.println("==> serial:" + serial);
  return null;
 }
}

注意我們?cè)趕ervice的實(shí)現(xiàn)類中不再需要@Service注解,因?yàn)檫@個(gè)service不再是傳統(tǒng)意義上的和Controller、DAO數(shù)據(jù)等進(jìn)行交互的service,而是要綁定綁定器打交道的service。

然后編寫其業(yè)務(wù)層的Controller:

@RestController
public class SendMessageController {
 @Autowired
 private IMessageProvider messageProvider;

 @GetMapping("/sendMessage")
 public String sendMessage() {
  return messageProvider.send();
 }
}

啟動(dòng)服務(wù)注冊(cè)中心后和RabbitMQ后,啟動(dòng)消息生產(chǎn)者微服務(wù),我們?cè)赗abbitMQ的控制面板中可以看見(jiàn)多出了一個(gè)名為testExchange的交換機(jī),這個(gè)交換機(jī)恰恰就是我們之前在配置文件中配置的交換機(jī)名字testExchange。

然后我們?cè)L問(wèn) http://localhost:8801/sendMessage 使用消息生產(chǎn)者微服務(wù)發(fā)送消息,在其微服務(wù)后臺(tái)我們看到了打印的消息。

在RabbitMQ的控制面板中我們也看到了確實(shí)發(fā)送了消息。

2.2 消息驅(qū)動(dòng)之消息消費(fèi)者

新建Module:cloud-stream-rabbitmq-consumer8802/8803作為消息的生產(chǎn)者用來(lái)接收消息,其POM文件中引入的啟動(dòng)器依賴和消息生產(chǎn)者微服務(wù)的依賴幾乎相同,然后編寫其配置文件application.yml,其配置文件的書寫和消息生產(chǎn)者的幾乎一致,特別需要注意的是,消息生產(chǎn)者微服務(wù)用到的通道為OUTPUT,而消息消費(fèi)者微服務(wù)用到的通道為INPUT,其他的配置文件信息就只需要注意端口號(hào)、注冊(cè)服務(wù)名的區(qū)別即可:

spring:
 cloud:
  bindings: 
  input: # 這個(gè)名字是一個(gè)通道的名稱,INPUT表示消息消費(fèi)者

編寫主啟動(dòng)類

編寫消息消費(fèi)者的業(yè)務(wù)類,由于是消費(fèi)者,所以只需要編寫其Controller即可,在其Controller上同樣需要添加 @EnableBinding 注解用來(lái)綁定消息的推送管道,消息消費(fèi)者綁定的消息推送管道為import org.springframework.cloud.stream.messaging.Sink,在接收消息的方法中需要使用 @StreamListner 注解來(lái)監(jiān)聽(tīng)其綁定的消息推送管道:

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
 
 @Value("${server.port}")
 private String serverPort;
 
 @StreamListener(Sink.INPUT)
 public void input(Message<String> message) {
  System.out.println("消費(fèi)者" + serverPort + "號(hào),收到消息:" 
       + message.getPayload());
 }
}

然后啟動(dòng)消息發(fā)送消費(fèi)者服務(wù),用生產(chǎn)者發(fā)送消息,我們可以發(fā)現(xiàn)在消費(fèi)者端可以成功接收到消息。

3. 分組消費(fèi)和持久化

3.1 重復(fù)消費(fèi)問(wèn)題

當(dāng)生產(chǎn)者發(fā)送消息后,此時(shí)的我們的消費(fèi)者都接受了消息并進(jìn)行了消費(fèi),也就是說(shuō)同一條消息被多個(gè)消息消費(fèi)者所消費(fèi)。

上述的問(wèn)題就是消息的 重復(fù)消費(fèi) 問(wèn)題,那么這個(gè)問(wèn)題為什么如此重要呢?其實(shí)重復(fù)消費(fèi)這個(gè)問(wèn)題本身不可怕,可怕的是沒(méi)考慮到重復(fù)消費(fèi)之后,怎么保證冪等性。(冪等性 通俗的說(shuō),就一個(gè)數(shù)據(jù),或者一個(gè)請(qǐng)求,重復(fù)很多次,需要確保對(duì)應(yīng)的數(shù)據(jù)是不會(huì)改變的,不能出錯(cuò))。分布式微服務(wù)應(yīng)用為了實(shí)現(xiàn)高可用和負(fù)載均衡,實(shí)際上同一功能的服務(wù)都會(huì)部署多個(gè)具體的服務(wù)實(shí)例。舉個(gè)例子,假設(shè)有一個(gè)系統(tǒng),有一條消息要求往數(shù)據(jù)庫(kù)里插入一條數(shù)據(jù),要是這個(gè)消息重復(fù)消費(fèi)兩次,結(jié)果就是向數(shù)據(jù)庫(kù)里插入了兩條數(shù)據(jù),這樣數(shù)據(jù)就錯(cuò)了,就違背了冪等性原則,但是要是該消息消費(fèi)到第二次的時(shí)候,可以判斷一下已經(jīng)消費(fèi)過(guò)了,然后直接將該消息丟棄,這就實(shí)現(xiàn)了只插入一條數(shù)據(jù),一條消息重復(fù)出現(xiàn)了兩次,但是只有第一次真正被消費(fèi)了,數(shù)據(jù)庫(kù)里也就只插入了一條數(shù)據(jù),這就保證了系統(tǒng)的冪等性。

上面簡(jiǎn)單的介紹了消息的重復(fù)消費(fèi)問(wèn)題,那如何解決這種重復(fù)消費(fèi)問(wèn)題呢,那就需要我們進(jìn)行 分組和持久化屬性組 操作,利用SpringCloud Stream中的消息分組來(lái)解決這個(gè)問(wèn)題,需要注意的是在Stream中處于同一組中的多個(gè)消息消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,也就是保證生產(chǎn)者所發(fā)送的同一個(gè)消息只會(huì)被其中一個(gè)消費(fèi)者消費(fèi)一次。 不同組的消費(fèi)者是可以對(duì)消息進(jìn)行全面消費(fèi)(重復(fù)消費(fèi))的,只有同一組內(nèi)才會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系。

在RabbitMQ中,默認(rèn)分組group是不同的,組流水號(hào)不一樣,被認(rèn)為不同組,我們查看testExchange交換機(jī),可以發(fā)現(xiàn)8802和8803兩個(gè)消息消費(fèi)者處于不同的組,所以8801消息生產(chǎn)者發(fā)送的消息可以被這兩個(gè)消費(fèi)者重復(fù)消費(fèi):

3.2 分組解決重復(fù)消費(fèi)問(wèn)題

上面在RabbitMQ控制面板中我們看到的組流水號(hào)是系統(tǒng)隨機(jī)分配的,這樣無(wú)疑不好控制,所以我們應(yīng)該自定義配置分組,將8802/8803兩個(gè)消息消費(fèi)者微服務(wù)分為同一個(gè)組,以此來(lái)解決消息的重復(fù)消費(fèi)問(wèn)題。

先來(lái)演示如何自定義分組

在8802/8803微服務(wù)中的配置文件中分別添加組名屬性:

spring:
 cloud:
 stream:
  bindings:
  input:
   group: A/B # 分組名稱

這里我們將8802設(shè)置為A組,8803設(shè)置為B組,然后我們將消息消費(fèi)方的兩個(gè)微服務(wù)重啟,我們?cè)俅尾榭雌浣M流水號(hào),發(fā)現(xiàn)不再是長(zhǎng)長(zhǎng)的隨機(jī)組流水號(hào),而變成了我們自定義的分組:

此時(shí)由于8802/8803位于兩個(gè)不同分組下,所以沒(méi)有競(jìng)爭(zhēng)關(guān)系,消息生產(chǎn)者發(fā)送消息后,仍然可以重復(fù)消費(fèi)。

下面我們將這兩個(gè)消息消費(fèi)方微服務(wù)分到相同的消費(fèi)組中,這樣每次就只有一個(gè)消費(fèi)者,消息生產(chǎn)者發(fā)送的消息只能被8802或8803其中一個(gè)接受到,這樣就避免了重復(fù)消費(fèi),將8802和8803的分組名都改為A,再次重啟兩個(gè)消息消費(fèi)方微服務(wù),此時(shí)我們可以看到在分組A下已經(jīng)有了兩個(gè)消費(fèi)者。

再用生產(chǎn)者發(fā)送5條消息,我們發(fā)現(xiàn)8802/8803分別消費(fèi)了3條和2條不同的消息,而沒(méi)有出現(xiàn)重復(fù)消費(fèi)的問(wèn)題。

3.3 持久化

通過(guò)上述,解決了重復(fù)消費(fèi)問(wèn)題,再來(lái)看看持久化

加上了group就自動(dòng)支持持久化了

下面來(lái)演示一下持久化

  • 停止8802/8803并去除掉8802分組group:A(8803的分組group A沒(méi)有去掉)
  • 8801發(fā)送4條消息到rabbitmq
  • 先啟動(dòng)8802(無(wú)分組屬性配置),后臺(tái)沒(méi)有打出來(lái)消息(消息丟失故障)
  • 再啟動(dòng)8803(有分組屬性配置),后臺(tái)打出了4條消息(消費(fèi)持久化消息)

到此這篇關(guān)于SpringCloud Stream消息驅(qū)動(dòng)的文章就介紹到這了,更多相關(guān)SpringCloud Stream消息驅(qū)動(dòng)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java基于poi導(dǎo)出excel透視表代碼實(shí)例

    java基于poi導(dǎo)出excel透視表代碼實(shí)例

    這篇文章主要介紹了java基于poi導(dǎo)出excel透視表代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • Flutter ListView 上拉加載更多下拉刷新功能實(shí)現(xiàn)方法

    Flutter ListView 上拉加載更多下拉刷新功能實(shí)現(xiàn)方法

    這篇文章主要介紹了Flutter ListView 上拉加載更多下拉刷新功能實(shí)現(xiàn)方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-07-07
  • 詳解Spring Cloud中Hystrix的請(qǐng)求合并

    詳解Spring Cloud中Hystrix的請(qǐng)求合并

    這篇文章主要介紹了詳解Spring Cloud中Hystrix的請(qǐng)求合并,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-05-05
  • 關(guān)于JSCH使用自定義連接池的說(shuō)明

    關(guān)于JSCH使用自定義連接池的說(shuō)明

    這篇文章主要介紹了關(guān)于JSCH使用自定義連接池的說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-09-09
  • Java中clone方法使用筆記

    Java中clone方法使用筆記

    clone顧名思義是復(fù)制,在Java語(yǔ)言中,clone方法被對(duì)象調(diào)用,所以會(huì)復(fù)制對(duì)象,下面這篇文章主要給大家介紹了關(guān)于Java中clone方法使用的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • java 使用foreach遍歷集合元素的實(shí)例

    java 使用foreach遍歷集合元素的實(shí)例

    這篇文章主要介紹了java 使用foreach遍歷集合元素的實(shí)例的相關(guān)資料,這里提供實(shí)例幫助大家理解如何使用foreach 進(jìn)行遍歷,希望能幫助到大家,
    2017-08-08
  • 在java中使用dom解析xml的示例分析

    在java中使用dom解析xml的示例分析

    本篇文章介紹了,在java中使用dom解析xml的示例分析。需要的朋友參考下
    2013-05-05
  • Json字符串轉(zhuǎn)Java對(duì)象和List代碼實(shí)例

    Json字符串轉(zhuǎn)Java對(duì)象和List代碼實(shí)例

    這篇文章主要介紹了Json字符串轉(zhuǎn)Java對(duì)象和List代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • java AOP原理以及實(shí)例用法總結(jié)

    java AOP原理以及實(shí)例用法總結(jié)

    在本篇文章里我們給大家整理了關(guān)于java AOP原理以及相關(guān)知識(shí)點(diǎn)總結(jié),正在學(xué)習(xí)的朋友們參考下。
    2019-05-05
  • Spring Boot定時(shí)器創(chuàng)建及使用解析

    Spring Boot定時(shí)器創(chuàng)建及使用解析

    這篇文章主要介紹了Spring Boot定時(shí)器創(chuàng)建及使用解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07

最新評(píng)論