欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

最新SpringCloud?Stream消息驅動講解

 更新時間:2022年11月02日 11:16:09   作者:小鐘要學習!?。? 
SpringCloud Stream 是一個構建消息驅動微服務的框架,通過 SpringCloud Stream 連接消息中間件,以實現(xiàn)消息事件驅動,這篇文章主要介紹了SpringCloud?Stream消息驅動,需要的朋友可以參考下

SpringCloud Stream消息驅動

1、SpringCloud Stream概述

官方地址:https://spring.io/projects/spring-cloud-stream#overview

中文指導手冊地址:https://m.wang1314.com/doc/webapp/topic/20971999.html

SpringCloud Stream 是一個構建消息驅動微服務的框架
應用程序通過 outputs 或 inputs 來與 SpringCloud Stream 中的 binder 對象交互
SpringCloud Stream 中的 binder 對象負責與消息中間件交互
通過 SpringCloud Stream 連接消息中間件,以實現(xiàn)消息事件驅動

什么是SpringCloudStream官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。

應用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream中binder對象交互。通過我們配置binding(綁定) ,而 Spring Cloud Stream 的 binder對象負責與消息中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅動的方式。

通過使用Spring Integration來連接消息代理中間件以實現(xiàn)消息事件驅動。Spring Cloud Stream 為一些供應商的消息中間件產(chǎn)品提供了個性化的自動化配置實現(xiàn),引用了發(fā)布-訂閱、消費組、分區(qū)的三個核心概念。

目前僅支持RabbitMQ、Kafka。

1.1、設計思想

1、標注的MQ流程

生產(chǎn)者/消費者之間靠消息媒介傳遞信息內(nèi)容【massage】

消息必須走特定的通道【消息通道MessageChannel】

消息通道里的消息如何被消費呢,誰負責收發(fā)處理

消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器所訂閱

2、Cloud Stream的作用

比方說我們用到了RabbitMQ和Kafka,由于這兩個消息中間件的架構上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū)。

這些中間件的差異性導致我們實際項目開發(fā)給我們造成了一定的困擾,我們?nèi)绻昧藘蓚€消息隊列的其中一種,后面的業(yè)務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統(tǒng)耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式。

3、什么是Binder

在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,由于各消息中間件構建的初衷不同,它們的實現(xiàn)細節(jié)上會有較大的差異性通過定義綁定器作為中間層,完美地實現(xiàn)了應用程序與消息中間件細節(jié)之間的隔離。

通過向應用程序暴露統(tǒng)一的Channel通道,使得應用程序不需要再考慮各種不同的消息中間件實現(xiàn)。

Binder可以生成Binding,Binding用來綁定消息容器的生產(chǎn)者和消費者,它有兩種類型,INPUT和OUTPUT,INPUT對應于消費者,OUTPUT對應于生產(chǎn)者。 4、Stream中的消息通信方式遵循了發(fā)布-訂閱模式

使用Topic主題進行廣播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic

1.2、標準的流程套路

1、Binder:很方便的連接中間件,屏蔽不同的差異

2、Channel

通道,是隊列Queue的一種抽象,在消息通訊系統(tǒng)中就是實現(xiàn)存儲和轉發(fā)的媒介,通過Channel對隊列進行配置

3、Source和Sink

簡單的可理解為參照對象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。

1.3、編碼API和常用注解

組成和注解描述
Middleware中間件,目前只支持RabbitM和Kafka
BinderBinder是應用與消息中間的封裝,目前實現(xiàn)了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態(tài)的改變消息類型(對應Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現(xiàn)
@Input注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序
@Output注解標識輸出通道,發(fā)布的消息將通過通道離開應用程序
@StreamListener監(jiān)聽隊列,用戶消費者的隊列的消息接收
@EnableBinding指通道channel和exchange綁定在一起

2、消息驅動之生產(chǎn)者(output)

2.1、新建模塊cloud-stream-rabbitmq-provider8801

2.2、引入pom.xml配置文件

如果是需要Stream整合的就將依賴改為spring-cloud-starter-stream-kafka

<dependencies>
    <!--stream整合rabbit依賴-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基礎配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.3、YAML配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環(huán)境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱,消息生產(chǎn)者
          destination: studyExchange # 表示要使用的Exchange名稱定義【自定義】
          content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置【上面的配置】

eureka:
  client: # 客戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

2.4、生產(chǎn)者啟動類

 package com.zcl.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 描述:消息生產(chǎn)者啟動類
 *
 * @author zhong
 * @date 2022-09-22 12:19
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

2.5、業(yè)務實現(xiàn)

2.5.1、服務接口實現(xiàn)類

自己創(chuàng)建一個實現(xiàn)的接口以及里面的方法

注意:在這個服務實現(xiàn)類里面不是使用@Service注解了,因為不是web應用,而是Stream消息驅動,是與中間件進行打交道的不是與數(shù)據(jù)庫

package com.zcl.springcloud.service.Impl;

import com.zcl.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 描述:發(fā)送接口實現(xiàn)類
 * 必須使用@EnableBinding(Source.class)注解開啟消息推送管道
 *
 * @author zhong
 * @date 2022-09-22 12:24
 */
@Slf4j
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {

    /**
     * 消息發(fā)送管道
     */
    @Resource
    private MessageChannel output;

    /**
     * 發(fā)送消息
     * @return
     */
    @Override
    public String send() {
        // 定義消息
        String serial = UUID.randomUUID().toString();
        // 構建并發(fā)送消息
        this.output.send(MessageBuilder.withPayload(serial).build());
        log.info("-------------- " + serial + " ----------------");
        return serial;
    }
}

2.5.2、控制器實現(xiàn)

package com.zcl.springcloud.controller;

import com.zcl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 描述:消息發(fā)送控制器
 *
 * @author zhong
 * @date 2022-09-22 12:37
 */
@RestController
public class SendMessageController {

    /**
     * 注入消息發(fā)送管道接口
     */
    @Resource
    private IMessageProvider messageProvider;

    /**
     * 每調(diào)用一次接口發(fā)送一次消息
     * @return
     */
    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}

2.6、啟動測試

  • 啟動7001Eureka訪問中心
  • 啟動8801消息發(fā)送者,啟動成功以及觀察RabbitMQ的管理界面

3.訪問接口發(fā)送消息,查看MQ的管理頁面波峰情況

3、消息驅動之消費者(input)

同樣的參考如下流程圖

3.1、新建cloud-stream-rabbitmq-consumer8802模塊

3.2、引入pom.xml依賴

與8801一樣

3.3、添加YAML配置文件

配置文件與消息生產(chǎn)的區(qū)別在于:

output: # 這個名字是一個通道的名稱
	destination: studyExchange # 表示要使用的Exchange名稱定義
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環(huán)境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置

eureka:
  client: # 客戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

3.4、添加啟動類StreamMQMain8802

與消息生產(chǎn)者一樣

3.5、業(yè)務實現(xiàn)

必須要有@Component注解注入到Spring容器中

package com.zcl.springcloud.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 描述:消息消費者控制器
 *
 * @author zhong
 * @date 2022-09-22 13:18
 */
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    /**
     * 注入消費者的端口號
     */
    @Value("${server.port}")
    private String port;

    /**
     * 監(jiān)聽消息
     * @param message
     * @return
     */
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消費者1號接收到的消息 ----- " + message.getPayload() + " -----,port: " + port);
    }
}

3.6、啟動項目測試

  • 啟動7001
  • 啟動8801,消息發(fā)送者
  • 啟動8802,消息消費者
  • 8801發(fā)送消息,8802消費消息,并查看具體的MQ波峰圖

控制器輸出

4、分組消費與持久化

4.1、完整參考cloud-stream-rabbitmq-consumer8802,創(chuàng)建8803項目

除了啟動的端口號不一樣之外其他的配置都一樣

4.2、啟動項目發(fā)現(xiàn)問題

  • 啟動7001(Eureka服務中心)
  • 啟動8801(生產(chǎn))、8802(消費)、8803(消費)
  • 測試發(fā)送消失是否兩個消費者都可以接收到

4.2.1、重復消費

目前是8802/8803同時都收到了,存在重復消費問題

解決方案:分組和持久化屬性group

常見案例

比如在如下場景中,訂單系統(tǒng)我們做集群部署,都會從RabbitMQ中獲取訂單信息,那如果一個訂單同時被兩個服務獲取到,那么就會造成數(shù)據(jù)錯誤,我們得避免這種情況。這時我們就可以使用Stream中的消息分組來解決

注意在Stream中處于同一個group中的多個消費者是競爭關系,就能夠保證消息只會被其中一個應用消費一次。

不同組是可以全面消費的(重復消費),同一組內(nèi)會發(fā)生競爭關系,只有其中一個可以消費。

4.2.2、分組

自定義配置分組,自定義分為同一個組,解決重復消費問題

配置文件分組

分別給8801、8802進行分組【orderA】

重啟項目查看MQ管理

orderB是歷史記錄,上面的配置以及都分為了ordeerA組,進入orderA組可以查看實際的消費者數(shù)量

同一組內(nèi)會發(fā)生競爭關系,只有其中一個可以消費,啟動項目測試是否為真

4.2.3、持久化

通過上述,解決了重復消費問題,再看看持久化

  • 停止8802/8803并去除掉8802的分組group: atguiguA,8803保留

  • 8801先發(fā)送7條消息到rabbitmq

3.先啟動8802,無分組屬性配置,后臺沒有打出來消息

8802因為取消了groupA的分組所以獲取不到持久化的數(shù)據(jù)(如果重啟mq也會消失)

4.再啟動8803,有分組屬性配置,后臺打出來了MQ上的消息

8803保存groupA的分組所以在啟動的時候就會將持久化的數(shù)據(jù)消費

到此這篇關于SpringCloud Stream消息驅動的文章就介紹到這了,更多相關SpringCloud Stream消息驅動內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • java使用http實現(xiàn)文件下載學習示例

    java使用http實現(xiàn)文件下載學習示例

    這篇文章主要介紹了java使用http實現(xiàn)文件下載學習示例,需要的朋友可以參考下
    2014-04-04
  • @Autowired注解注入的xxxMapper報錯問題及解決

    @Autowired注解注入的xxxMapper報錯問題及解決

    這篇文章主要介紹了@Autowired注解注入的xxxMapper報錯問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • 詳解Java中的靜態(tài)代理模式

    詳解Java中的靜態(tài)代理模式

    這篇文章主要為大家介紹了Java中的靜態(tài)代理模式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2022-12-12
  • 解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請求的操作

    解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請求的操作

    這篇文章主要介紹了解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請求的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • SpringCloud hystrix服務降級概念介紹

    SpringCloud hystrix服務降級概念介紹

    什么是服務降級?當服務器壓力劇增的情況下,根據(jù)實際業(yè)務情況及流量,對一些服務和頁面有策略的不處理或換種簡單的方式處理,從而釋放服務器資源以保證核心交易正常運作或高效運作
    2022-09-09
  • java數(shù)獨游戲完整版分享

    java數(shù)獨游戲完整版分享

    這篇文章主要為大家分享了java數(shù)獨游戲的完整版,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-12-12
  • Spring Security單項目權限設計過程解析

    Spring Security單項目權限設計過程解析

    這篇文章主要介紹了Spring Security單項目權限設計過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-11-11
  • 使用SpringBoot開發(fā)Restful服務實現(xiàn)增刪改查功能

    使用SpringBoot開發(fā)Restful服務實現(xiàn)增刪改查功能

    Spring Boot是由Pivotal團隊提供的全新框架,其設計目的是用來簡化新Spring應用的初始搭建以及開發(fā)過程。這篇文章主要介紹了基于SpringBoot開發(fā)一個Restful服務,實現(xiàn)增刪改查功能,需要的朋友可以參考下
    2018-01-01
  • 一文詳解Java中Stream流的使用

    一文詳解Java中Stream流的使用

    JDK8新增了Stream(流操作)處理集合的數(shù)據(jù),可執(zhí)行查找、過濾和映射數(shù)據(jù)等操作.本文將通過一些實例介紹stream流的使用,需要的可以參考一下
    2022-05-05
  • java非法字符‘\ufeff‘解決方法

    java非法字符‘\ufeff‘解決方法

    本文主要介紹了java非法字符‘\ufeff‘解決方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-07-07

最新評論