欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

最新SpringCloud?Stream消息驅(qū)動(dòng)講解

 更新時(shí)間:2022年11月02日 11:16:09   作者:小鐘要學(xué)習(xí)?。。? 
SpringCloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架,通過(guò) SpringCloud Stream 連接消息中間件,以實(shí)現(xiàn)消息事件驅(qū)動(dòng),這篇文章主要介紹了SpringCloud?Stream消息驅(qū)動(dòng),需要的朋友可以參考下

SpringCloud Stream消息驅(qū)動(dòng)

1、SpringCloud Stream概述

官方地址:https://spring.io/projects/spring-cloud-stream#overview

中文指導(dǎo)手冊(cè)地址:https://m.wang1314.com/doc/webapp/topic/20971999.html

SpringCloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架
應(yīng)用程序通過(guò) outputs 或 inputs 來(lái)與 SpringCloud Stream 中的 binder 對(duì)象交互
SpringCloud Stream 中的 binder 對(duì)象負(fù)責(zé)與消息中間件交互
通過(guò) SpringCloud Stream 連接消息中間件,以實(shí)現(xiàn)消息事件驅(qū)動(dòng)

什么是SpringCloudStream官方定義 Spring Cloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架。

應(yīng)用程序通過(guò) inputs 或者 outputs 來(lái)與 Spring Cloud Stream中binder對(duì)象交互。通過(guò)我們配置binding(綁定) ,而 Spring Cloud Stream 的 binder對(duì)象負(fù)責(zé)與消息中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅(qū)動(dòng)的方式。

通過(guò)使用Spring Integration來(lái)連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)。Spring Cloud Stream 為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn),引用了發(fā)布-訂閱、消費(fèi)組、分區(qū)的三個(gè)核心概念。

目前僅支持RabbitMQ、Kafka。

1.1、設(shè)計(jì)思想

1、標(biāo)注的MQ流程

生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容【massage】

消息必須走特定的通道【消息通道MessageChannel】

消息通道里的消息如何被消費(fèi)呢,誰(shuí)負(fù)責(zé)收發(fā)處理

消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器所訂閱

2、Cloud Stream的作用

比方說(shuō)我們用到了RabbitMQ和Kafka,由于這兩個(gè)消息中間件的架構(gòu)上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū)。

這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開(kāi)發(fā)給我們?cè)斐闪艘欢ǖ睦_,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種,后面的業(yè)務(wù)需求,我想往另外一種消息隊(duì)列進(jìn)行遷移,這時(shí)候無(wú)疑就是一個(gè)災(zāi)難性的,一大堆東西都要重新推倒重新做,因?yàn)樗覀兊南到y(tǒng)耦合了,這時(shí)候springcloud Stream給我們提供了一種解耦合的方式。

3、什么是Binder

在沒(méi)有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會(huì)有較大的差異性通過(guò)定義綁定器作為中間層,完美地實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。

通過(guò)向應(yīng)用程序暴露統(tǒng)一的Channel通道,使得應(yīng)用程序不需要再考慮各種不同的消息中間件實(shí)現(xiàn)。

Binder可以生成Binding,Binding用來(lái)綁定消息容器的生產(chǎn)者和消費(fèi)者,它有兩種類(lèi)型,INPUT和OUTPUT,INPUT對(duì)應(yīng)于消費(fèi)者,OUTPUT對(duì)應(yīng)于生產(chǎn)者。 4、Stream中的消息通信方式遵循了發(fā)布-訂閱模式

使用Topic主題進(jìn)行廣播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic

1.2、標(biāo)準(zhǔn)的流程套路

1、Binder:很方便的連接中間件,屏蔽不同的差異

2、Channel

通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲(chǔ)和轉(zhuǎn)發(fā)的媒介,通過(guò)Channel對(duì)隊(duì)列進(jìn)行配置

3、Source和Sink

簡(jiǎn)單的可理解為參照對(duì)象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。

1.3、編碼API和常用注解

組成和注解描述
Middleware中間件,目前只支持RabbitM和Kafka
BinderBinder是應(yīng)用與消息中間的封裝,目前實(shí)現(xiàn)了Kafka和RabbitMQ的Binder,通過(guò)Binder可以很方便的連接中間件,可以動(dòng)態(tài)的改變消息類(lèi)型(對(duì)應(yīng)Kafka的topic,RabbitMQ的exchange),這些都可以通過(guò)配置文件來(lái)實(shí)現(xiàn)
@Input注解標(biāo)識(shí)輸入通道,通過(guò)該輸入通道接收到的消息進(jìn)入應(yīng)用程序
@Output注解標(biāo)識(shí)輸出通道,發(fā)布的消息將通過(guò)通道離開(kāi)應(yīng)用程序
@StreamListener監(jiān)聽(tīng)隊(duì)列,用戶(hù)消費(fèi)者的隊(duì)列的消息接收
@EnableBinding指通道channel和exchange綁定在一起

2、消息驅(qū)動(dòng)之生產(chǎn)者(output)

2.1、新建模塊cloud-stream-rabbitmq-provider8801

2.2、引入pom.xml配置文件

如果是需要Stream整合的就將依賴(lài)改為spring-cloud-starter-stream-kafka

<dependencies>
    <!--stream整合rabbit依賴(lài)-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基礎(chǔ)配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.3、YAML配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
        defaultRabbit: # 表示定義的名稱(chēng),用于于binding整合
          type: rabbit # 消息組件類(lèi)型
          environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務(wù)的整合處理
        output: # 這個(gè)名字是一個(gè)通道的名稱(chēng),消息生產(chǎn)者
          destination: studyExchange # 表示要使用的Exchange名稱(chēng)定義【自定義】
          content-type: application/json # 設(shè)置消息類(lèi)型,本次為json,文本則設(shè)置“text/plain”
          binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置【上面的配置】

eureka:
  client: # 客戶(hù)端進(jìn)行Eureka注冊(cè)的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

2.4、生產(chǎn)者啟動(dòng)類(lèi)

 package com.zcl.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 描述:消息生產(chǎn)者啟動(dòng)類(lèi)
 *
 * @author zhong
 * @date 2022-09-22 12:19
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

2.5、業(yè)務(wù)實(shí)現(xiàn)

2.5.1、服務(wù)接口實(shí)現(xiàn)類(lèi)

自己創(chuàng)建一個(gè)實(shí)現(xiàn)的接口以及里面的方法

注意:在這個(gè)服務(wù)實(shí)現(xiàn)類(lèi)里面不是使用@Service注解了,因?yàn)椴皇莣eb應(yīng)用,而是Stream消息驅(qū)動(dòng),是與中間件進(jìn)行打交道的不是與數(shù)據(jù)庫(kù)

package com.zcl.springcloud.service.Impl;

import com.zcl.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 描述:發(fā)送接口實(shí)現(xiàn)類(lèi)
 * 必須使用@EnableBinding(Source.class)注解開(kāi)啟消息推送管道
 *
 * @author zhong
 * @date 2022-09-22 12:24
 */
@Slf4j
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {

    /**
     * 消息發(fā)送管道
     */
    @Resource
    private MessageChannel output;

    /**
     * 發(fā)送消息
     * @return
     */
    @Override
    public String send() {
        // 定義消息
        String serial = UUID.randomUUID().toString();
        // 構(gòu)建并發(fā)送消息
        this.output.send(MessageBuilder.withPayload(serial).build());
        log.info("-------------- " + serial + " ----------------");
        return serial;
    }
}

2.5.2、控制器實(shí)現(xiàn)

package com.zcl.springcloud.controller;

import com.zcl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 描述:消息發(fā)送控制器
 *
 * @author zhong
 * @date 2022-09-22 12:37
 */
@RestController
public class SendMessageController {

    /**
     * 注入消息發(fā)送管道接口
     */
    @Resource
    private IMessageProvider messageProvider;

    /**
     * 每調(diào)用一次接口發(fā)送一次消息
     * @return
     */
    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}

2.6、啟動(dòng)測(cè)試

  • 啟動(dòng)7001Eureka訪問(wèn)中心
  • 啟動(dòng)8801消息發(fā)送者,啟動(dòng)成功以及觀察RabbitMQ的管理界面

3.訪問(wèn)接口發(fā)送消息,查看MQ的管理頁(yè)面波峰情況

3、消息驅(qū)動(dòng)之消費(fèi)者(input)

同樣的參考如下流程圖

3.1、新建cloud-stream-rabbitmq-consumer8802模塊

3.2、引入pom.xml依賴(lài)

與8801一樣

3.3、添加YAML配置文件

配置文件與消息生產(chǎn)的區(qū)別在于:

output: # 這個(gè)名字是一個(gè)通道的名稱(chēng)
	destination: studyExchange # 表示要使用的Exchange名稱(chēng)定義
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
        defaultRabbit: # 表示定義的名稱(chēng),用于于binding整合
          type: rabbit # 消息組件類(lèi)型
          environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務(wù)的整合處理
        input: # 這個(gè)名字是一個(gè)通道的名稱(chēng)
          destination: studyExchange # 表示要使用的Exchange名稱(chēng)定義
          content-type: application/json # 設(shè)置消息類(lèi)型,本次為對(duì)象json,如果是文本則設(shè)置“text/plain”
          binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置

eureka:
  client: # 客戶(hù)端進(jìn)行Eureka注冊(cè)的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

3.4、添加啟動(dòng)類(lèi)StreamMQMain8802

與消息生產(chǎn)者一樣

3.5、業(yè)務(wù)實(shí)現(xiàn)

必須要有@Component注解注入到Spring容器中

package com.zcl.springcloud.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 描述:消息消費(fèi)者控制器
 *
 * @author zhong
 * @date 2022-09-22 13:18
 */
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    /**
     * 注入消費(fèi)者的端口號(hào)
     */
    @Value("${server.port}")
    private String port;

    /**
     * 監(jiān)聽(tīng)消息
     * @param message
     * @return
     */
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消費(fèi)者1號(hào)接收到的消息 ----- " + message.getPayload() + " -----,port: " + port);
    }
}

3.6、啟動(dòng)項(xiàng)目測(cè)試

  • 啟動(dòng)7001
  • 啟動(dòng)8801,消息發(fā)送者
  • 啟動(dòng)8802,消息消費(fèi)者
  • 8801發(fā)送消息,8802消費(fèi)消息,并查看具體的MQ波峰圖

控制器輸出

4、分組消費(fèi)與持久化

4.1、完整參考cloud-stream-rabbitmq-consumer8802,創(chuàng)建8803項(xiàng)目

除了啟動(dòng)的端口號(hào)不一樣之外其他的配置都一樣

4.2、啟動(dòng)項(xiàng)目發(fā)現(xiàn)問(wèn)題

  • 啟動(dòng)7001(Eureka服務(wù)中心)
  • 啟動(dòng)8801(生產(chǎn))、8802(消費(fèi))、8803(消費(fèi))
  • 測(cè)試發(fā)送消失是否兩個(gè)消費(fèi)者都可以接收到

4.2.1、重復(fù)消費(fèi)

目前是8802/8803同時(shí)都收到了,存在重復(fù)消費(fèi)問(wèn)題

解決方案:分組和持久化屬性group

常見(jiàn)案例

比如在如下場(chǎng)景中,訂單系統(tǒng)我們做集群部署,都會(huì)從RabbitMQ中獲取訂單信息,那如果一個(gè)訂單同時(shí)被兩個(gè)服務(wù)獲取到,那么就會(huì)造成數(shù)據(jù)錯(cuò)誤,我們得避免這種情況。這時(shí)我們就可以使用Stream中的消息分組來(lái)解決

注意在Stream中處于同一個(gè)group中的多個(gè)消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。

不同組是可以全面消費(fèi)的(重復(fù)消費(fèi)),同一組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi)。

4.2.2、分組

自定義配置分組,自定義分為同一個(gè)組,解決重復(fù)消費(fèi)問(wèn)題

配置文件分組

分別給8801、8802進(jìn)行分組【orderA】

重啟項(xiàng)目查看MQ管理

orderB是歷史記錄,上面的配置以及都分為了ordeerA組,進(jìn)入orderA組可以查看實(shí)際的消費(fèi)者數(shù)量

同一組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi),啟動(dòng)項(xiàng)目測(cè)試是否為真

4.2.3、持久化

通過(guò)上述,解決了重復(fù)消費(fèi)問(wèn)題,再看看持久化

  • 停止8802/8803并去除掉8802的分組group: atguiguA,8803保留

  • 8801先發(fā)送7條消息到rabbitmq

3.先啟動(dòng)8802,無(wú)分組屬性配置,后臺(tái)沒(méi)有打出來(lái)消息

8802因?yàn)槿∠?code>groupA的分組所以獲取不到持久化的數(shù)據(jù)(如果重啟mq也會(huì)消失)

4.再啟動(dòng)8803,有分組屬性配置,后臺(tái)打出來(lái)了MQ上的消息

8803保存groupA的分組所以在啟動(dòng)的時(shí)候就會(huì)將持久化的數(shù)據(jù)消費(fèi)

到此這篇關(guān)于SpringCloud Stream消息驅(qū)動(dòng)的文章就介紹到這了,更多相關(guān)SpringCloud Stream消息驅(qū)動(dòng)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java使用http實(shí)現(xiàn)文件下載學(xué)習(xí)示例

    java使用http實(shí)現(xiàn)文件下載學(xué)習(xí)示例

    這篇文章主要介紹了java使用http實(shí)現(xiàn)文件下載學(xué)習(xí)示例,需要的朋友可以參考下
    2014-04-04
  • @Autowired注解注入的xxxMapper報(bào)錯(cuò)問(wèn)題及解決

    @Autowired注解注入的xxxMapper報(bào)錯(cuò)問(wèn)題及解決

    這篇文章主要介紹了@Autowired注解注入的xxxMapper報(bào)錯(cuò)問(wèn)題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • 詳解Java中的靜態(tài)代理模式

    詳解Java中的靜態(tài)代理模式

    這篇文章主要為大家介紹了Java中的靜態(tài)代理模式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-12-12
  • 解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請(qǐng)求的操作

    解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請(qǐng)求的操作

    這篇文章主要介紹了解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請(qǐng)求的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-12-12
  • SpringCloud hystrix服務(wù)降級(jí)概念介紹

    SpringCloud hystrix服務(wù)降級(jí)概念介紹

    什么是服務(wù)降級(jí)?當(dāng)服務(wù)器壓力劇增的情況下,根據(jù)實(shí)際業(yè)務(wù)情況及流量,對(duì)一些服務(wù)和頁(yè)面有策略的不處理或換種簡(jiǎn)單的方式處理,從而釋放服務(wù)器資源以保證核心交易正常運(yùn)作或高效運(yùn)作
    2022-09-09
  • java數(shù)獨(dú)游戲完整版分享

    java數(shù)獨(dú)游戲完整版分享

    這篇文章主要為大家分享了java數(shù)獨(dú)游戲的完整版,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-12-12
  • Spring Security單項(xiàng)目權(quán)限設(shè)計(jì)過(guò)程解析

    Spring Security單項(xiàng)目權(quán)限設(shè)計(jì)過(guò)程解析

    這篇文章主要介紹了Spring Security單項(xiàng)目權(quán)限設(shè)計(jì)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • 使用SpringBoot開(kāi)發(fā)Restful服務(wù)實(shí)現(xiàn)增刪改查功能

    使用SpringBoot開(kāi)發(fā)Restful服務(wù)實(shí)現(xiàn)增刪改查功能

    Spring Boot是由Pivotal團(tuán)隊(duì)提供的全新框架,其設(shè)計(jì)目的是用來(lái)簡(jiǎn)化新Spring應(yīng)用的初始搭建以及開(kāi)發(fā)過(guò)程。這篇文章主要介紹了基于SpringBoot開(kāi)發(fā)一個(gè)Restful服務(wù),實(shí)現(xiàn)增刪改查功能,需要的朋友可以參考下
    2018-01-01
  • 一文詳解Java中Stream流的使用

    一文詳解Java中Stream流的使用

    JDK8新增了Stream(流操作)處理集合的數(shù)據(jù),可執(zhí)行查找、過(guò)濾和映射數(shù)據(jù)等操作.本文將通過(guò)一些實(shí)例介紹stream流的使用,需要的可以參考一下
    2022-05-05
  • java非法字符‘\ufeff‘解決方法

    java非法字符‘\ufeff‘解決方法

    本文主要介紹了java非法字符‘\ufeff‘解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07

最新評(píng)論