欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springcloud Stream消息驅(qū)動(dòng)工具使用介紹

 更新時(shí)間:2022年09月14日 11:11:37   作者:扎哇太棗糕  
SpringCloud Stream由一個(gè)中間件中立的核組成,應(yīng)用通過SpringCloud Stream插入的input(相當(dāng)于消費(fèi)者consumer,它是從隊(duì)列中接收消息的)和output(相當(dāng)于生產(chǎn)者producer,它是發(fā)送消息到隊(duì)列中的)通道與外界交流

springcloud Stream

什么是springcloud Stream

  現(xiàn)在市面上有很多的消息中間件,每一個(gè)公司使用的都有所不同,為了減少學(xué)習(xí)的成本,springcloud Stream可以讓我們不再關(guān)注消息中間件MQ的具體細(xì)節(jié),我們只需要通過適配綁定的方式即可實(shí)現(xiàn)不同MQ之間的切換,但是遺憾的是springcloud Stream目前只支持RabbitMQ和Kafka。

  SpringCloud Stream是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架,應(yīng)用程序通過inputs或者 outputs來與SpringCloud Stream中的binder進(jìn)行交互,我們可以通過配置來binding ,而 SpringCloud Stream 的binder負(fù)責(zé)與中間件交互,所以我們只需要搞清楚如何與Stream交互就可以很方便的使用消息驅(qū)動(dòng)了!

什么是Binder

  Binder是SpringCloud Stream的一個(gè)抽象概念,是應(yīng)用與消息中間件之間的粘合劑,通過定義綁定器Binder作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離,可以動(dòng)態(tài)的改變消息的destinations(對(duì)應(yīng)于 Kafka的topic,RabbitMQ的exchanges),這些都可以通過外部配置項(xiàng)來做到,甚至可以任意的改變中間件的類型但是不需要修改一行代碼

為什么使用Stream

  比方說我們用到了RabbitMQ和Kafka,由于這兩個(gè)消息中間件的架構(gòu)上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū),這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開發(fā)給我們?cè)斐闪艘欢ǖ睦_,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種,后面的業(yè)務(wù)需求,我想往另外一種消息隊(duì)列進(jìn)行遷移;這時(shí)候無疑就是一個(gè)災(zāi)難性的,一大堆東西都要重新推倒重新做,因?yàn)樗覀兊南到y(tǒng)耦合了,這襯候springcloud Stream給我們提供了一種解耦合的方式。

Stream使用案例

前置知識(shí)

Stream處理消息的架構(gòu)

  Source、Sink: 簡(jiǎn)單的可理解為參照對(duì)象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。Channel: 通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲(chǔ)和轉(zhuǎn)發(fā)的媒介。Binder: 消息的生產(chǎn)者和消費(fèi)者中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離

  通過以上兩張圖片可知,消息的處理流向是:消息生產(chǎn)者處理完業(yè)務(wù)邏輯之后消息到達(dá)source中,接著前往Channel通道進(jìn)行排隊(duì),然后通過binder綁定器將消息數(shù)據(jù)發(fā)送到底層mq,然后又通過binder綁定器接收到底層mq發(fā)送來的消息數(shù)據(jù),接著前往Channel通道進(jìn)行排隊(duì),由Sink接收到消息數(shù)據(jù),消息消費(fèi)者拿到消息數(shù)據(jù)執(zhí)行相應(yīng)的業(yè)務(wù)邏輯

Stream常用注解

消息生產(chǎn)者8801模塊搭建

  第一步: 創(chuàng)建一個(gè)maven模塊,引入相關(guān)依賴,最主要的就是stream整合rabbitmq的依賴

<!--stream的rabbitmq依賴-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 配置文件的編寫

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息組件類型
          environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務(wù)的整合處理
        output: # 這個(gè)名字是一個(gè)通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”
          binder: defaultRabbit  # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置

eureka:
  client: # 客戶端進(jìn)行Eureka注冊(cè)的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序類

@SpringBootApplication
public class CloudStreamRabbitmqProvider8801Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
        System.out.println("啟動(dòng)成功");
    }
}

  第四步: 業(yè)務(wù)層service代碼編寫,注意:這里實(shí)現(xiàn)類注入的對(duì)象由之前的dao層對(duì)象換成了channel通道對(duì)象,詳細(xì)的發(fā)送由實(shí)現(xiàn)類的第12完成

public interface IMessageProviderService {
    /**
     * 定義消息的推送管道
     *
     * @return
     */
    String send();
}
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
    /**
     * 消息發(fā)送管道/信道
     */
    @Resource
    private MessageChannel output;
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());

        System.out.println("*****serial: " + serial);
        return serial;
    }
}

  第五步: controller接口

@RestController
public class SendMessageController {
    @Resource
    private IMessageProviderService messageProviderService;
    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProviderService.send();
    }
}

消息消費(fèi)者8802模塊搭建

  第一步: 創(chuàng)建一個(gè)maven模塊,引入相關(guān)依賴,最主要的就是stream整合rabbitmq的依賴

<!--stream的rabbitmq依賴-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 配置文件的編寫,與生產(chǎn)者的區(qū)別就在于bindings下的是input而不是output

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息組件類型
          environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務(wù)的整合處理
        input: # 這個(gè)名字是一個(gè)通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”
          binder: defaultRabbit  # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
eureka:
  client: # 客戶端進(jìn)行Eureka注冊(cè)的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序類

@SpringBootApplication
public class CloudStreamRabbitmqConsumer8802Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
        System.out.println("啟動(dòng)成功");
    }
}

  第四步: controller接口,使用url請(qǐng)求生產(chǎn)者8801,即可在消費(fèi)者8802端接收到8801發(fā)送的消息

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消費(fèi)者1號(hào) ----> port:" + serverPort + "\t從8801接受到的消息是:" + message.getPayload());
    }
}

  兩個(gè)模塊搭建完成進(jìn)行測(cè)試,首先啟動(dòng)注冊(cè)中心7001,然后分別啟動(dòng)消息生產(chǎn)者8801和消息消費(fèi)者8802,通過url請(qǐng)求訪問8001的發(fā)送消息請(qǐng)求,會(huì)向指定管道中發(fā)送一條消息,如果此時(shí)這個(gè)管道中有消費(fèi)者即可接收到這條消息。而如何指定消息的管道歸屬呢,就是通過配置文件中的indings.input.destination來指定,命名相同的服務(wù)就會(huì)處在同一條管道中

Stream帶來的問題

重復(fù)消費(fèi)問題

  按照之前的使用,會(huì)帶來重復(fù)消費(fèi)問題: 也就是說一個(gè)通道上有不止一個(gè)消息消費(fèi)者,stream上默認(rèn)每一個(gè)消費(fèi)者都屬于不同的組,這樣的話就會(huì)導(dǎo)致這個(gè)消息被多個(gè)組的消費(fèi)者重復(fù)消費(fèi)

  知道了問題出現(xiàn)的原因就很容易解決了,只要我們自定義配置分組,將這些消費(fèi)者都分配到同一個(gè)組中就能避免重復(fù)消費(fèi)的問題出現(xiàn)了(同一個(gè)組間的消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,不管組間有多少的消費(fèi)者都只會(huì)消費(fèi)一次)

自定義分組

  只需要在配置文件修改一處配置即可實(shí)現(xiàn)自定義組名并且自定義分組,組名相同的服務(wù)會(huì)被分配到同一組,通道內(nèi)的消息數(shù)據(jù)會(huì)被該組中的所有消費(fèi)者輪詢消費(fèi)

持久化問題

  上面自定義分組使用的group配置除了可以自定義分組和分組名之外,還可以實(shí)現(xiàn)消息的持久化,也就是說使用group配置自定義分組和分組名的消息消費(fèi)者,就算在消息生產(chǎn)者發(fā)送消息的時(shí)候掛掉了,等這個(gè)消費(fèi)者重啟之后依然是能夠消費(fèi)之前發(fā)送的消息

這里一個(gè)生產(chǎn)者和兩個(gè)消費(fèi)者存在以下十三種情況(生產(chǎn)者發(fā)送四次消息):

1、都使用group分組的兩個(gè)不同組成員,在生產(chǎn)者生產(chǎn)的時(shí)候

  • 都沒掛(各消費(fèi)四次)
  • 掛了其中一個(gè)(各消費(fèi)四次)
  • 都掛了(各消費(fèi)四次)

2、都使用group分組的兩個(gè)同組成員,在生產(chǎn)者生產(chǎn)的時(shí)候

  • 都沒掛(各消費(fèi)兩次)
  • 掛了其中一個(gè)(沒掛的把四次消費(fèi)完)
  • 都掛了(各消費(fèi)兩次)

3、其中一個(gè)使用group分組的兩個(gè)成員,在生產(chǎn)者生產(chǎn)的時(shí)候

  • 都掛了(都不消費(fèi))
  • group的掛了(各消費(fèi)四次)
  • 沒group的掛了(沒掛的消費(fèi)四次,掛的由于沒有持久化所以不消費(fèi))
  • 都沒掛(各消費(fèi)四次)

4、都不使用group分組的兩個(gè)成員,在生產(chǎn)者生產(chǎn)的時(shí)候

  • 都掛了(都不消費(fèi))
  • 掛了其中一個(gè)(沒掛的消費(fèi)四次,掛的由于沒有持久化所以不消費(fèi))
  • 都沒掛(各消費(fèi)四次)

  總之一句話,通道里的消息會(huì)持久化給使用group配置的消息消費(fèi)者(每一組都有一份),就算發(fā)送消息的時(shí)候這些消費(fèi)者掛了,如果同組的消費(fèi)者有沒掛的就會(huì)把這些消息競(jìng)爭(zhēng)消費(fèi)完;如果同組沒有消費(fèi)者,等他重啟之后還是會(huì)消費(fèi)這些消息

到此這篇關(guān)于Springcloud Stream消息驅(qū)動(dòng)工具使用介紹的文章就介紹到這了,更多相關(guān)Springcloud Stream內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java數(shù)組歸納總結(jié)

    Java數(shù)組歸納總結(jié)

    這篇文章主要介紹了Java數(shù)組歸納總結(jié),總結(jié)內(nèi)容有一維數(shù)組、二維數(shù)組、遍歷數(shù)組、替換元素、數(shù)組排序、數(shù)組拷貝、元素查詢、排序算法,下面來看看這些方法的相關(guān)資料,需要的小伙伴可以辛苦一下
    2022-01-01
  • springboot自動(dòng)配置原理以及spring.factories文件的作用詳解

    springboot自動(dòng)配置原理以及spring.factories文件的作用詳解

    這篇文章主要介紹了springboot自動(dòng)配置原理以及spring.factories文件的作用詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • java刪除指定目錄下指定格式文件的方法

    java刪除指定目錄下指定格式文件的方法

    這篇文章主要為大家詳細(xì)介紹了java刪除指定目錄下指定格式文件的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-08-08
  • JPA如何設(shè)置表名和實(shí)體名,表字段與實(shí)體字段的對(duì)應(yīng)

    JPA如何設(shè)置表名和實(shí)體名,表字段與實(shí)體字段的對(duì)應(yīng)

    這篇文章主要介紹了JPA如何設(shè)置表名和實(shí)體名,表字段與實(shí)體字段的對(duì)應(yīng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • idea版本更新安裝教程詳解

    idea版本更新安裝教程詳解

    idea的更新版本比較特殊,需要重新安裝一下,這篇文章給大家介紹idea版本更新安裝教程詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2020-11-11
  • list轉(zhuǎn)tree和list中查找某節(jié)點(diǎn)下的所有數(shù)據(jù)操作

    list轉(zhuǎn)tree和list中查找某節(jié)點(diǎn)下的所有數(shù)據(jù)操作

    這篇文章主要介紹了list轉(zhuǎn)tree和list中查找某節(jié)點(diǎn)下的所有數(shù)據(jù)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • SpringBoot接口加密與解密的實(shí)現(xiàn)

    SpringBoot接口加密與解密的實(shí)現(xiàn)

    這篇文章主要介紹了SpringBoot接口加密與解密的實(shí)現(xiàn)
    2023-10-10
  • Spring Boot打包war jar 部署tomcat

    Spring Boot打包war jar 部署tomcat

    這篇文章主要介紹了Spring Boot打包war jar 部署tomcat的相關(guān)資料,需要的朋友可以參考下
    2017-10-10
  • Elasticsearch索引結(jié)構(gòu)與算法解析

    Elasticsearch索引結(jié)構(gòu)與算法解析

    ?作為搜索引擎的一部分,ES自然具有速度快、結(jié)果準(zhǔn)確、結(jié)果豐富等特點(diǎn),那么ES是如何達(dá)到“搜索引擎”級(jí)別的查詢效率呢?首先是索引,其次是壓縮算法,接下來我們就一起了解下ES的索引結(jié)構(gòu)和壓縮算法
    2023-04-04
  • Java中synchronized用法匯總

    Java中synchronized用法匯總

    使用 synchronized 無需手動(dòng)執(zhí)行加鎖和釋放鎖的操作,我們只需要聲明 synchronized 關(guān)鍵字就可以了,JVM 層面會(huì)幫我們自動(dòng)的進(jìn)行加鎖和釋放鎖的操作,我們今天重點(diǎn)來看一下synchronized 的幾種用法
    2022-04-04

最新評(píng)論