欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文快速掌握Spring?Cloud?Stream

 更新時(shí)間:2022年08月26日 10:10:39   作者:怪?咖@  
這篇文章主要介紹了Spring?Cloud?Stream詳解,本篇文章所涉及到的demo練習(xí)使用的cloud?2021.0.3+?springboot2.6.8,通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下

本篇文章所涉及到的demo練習(xí) 使用的cloud 2021.0.3+ springboot2.6.8

一、概述簡(jiǎn)介

官網(wǎng):https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/

官網(wǎng)概述:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations

1.1. cloud Stream是什么

官方定義:Spring Cloud Stream是一個(gè)用于構(gòu)建 與 共享消息系統(tǒng) 連接的高度可擴(kuò)展的事件驅(qū)動(dòng)微服務(wù)

目前主流的消息框架有:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

假設(shè)公司業(yè)務(wù)項(xiàng)目用了RabbitMQ,而大數(shù)據(jù)項(xiàng)目用了Kafka。這時(shí)候就會(huì)出現(xiàn)有兩個(gè)消息框架,相對(duì)于程序員來(lái)說(shuō)其實(shí)并不友好,還得兩個(gè)都掌握,正常對(duì)于一個(gè)程序員來(lái)說(shuō)熟練一個(gè)消息框架都不錯(cuò)了,何況還搞了兩個(gè),并且兩個(gè)維護(hù)起來(lái)也不好維護(hù)。

RabbitMQ和Kafka是兩個(gè)不同的框架,兩個(gè)消息模型上也存在著差異,并且代碼上用法也不一樣。Spring Cloud Stream就是不再關(guān)注具體MQ的細(xì)節(jié),可以在不改代碼的基礎(chǔ)上,來(lái)完成Rabbit和Kafka兩個(gè)不同的消息中間件的切換(這里的切換指的是原本用的RabbitMQ,但是用著用著發(fā)現(xiàn)kafka比較符合,所以想要換框架)。

總結(jié)成一句話:屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型

注意:遺憾的是目前僅支持RabbitMQ、Kafka。

1.2. 設(shè)計(jì)思想

常規(guī)的MQ設(shè)計(jì)如下:

  • Message:生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容
  • MessageChannel:消息必須走特定的通道
  • 隊(duì)列:假如發(fā)消息會(huì)先發(fā)到消息隊(duì)列當(dāng)中
  • 消息隊(duì)列的消息如何被消費(fèi)呢:訂閱的人可以進(jìn)行消費(fèi)

cloud Stream設(shè)計(jì)如下:

通過(guò)定義綁定器Binder作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。

在沒(méi)有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會(huì)有較大的差異性,通過(guò)定義綁定器作為中間層,完美地實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。Stream對(duì)消息中間件的進(jìn)一步封裝,可以做到代碼層面對(duì)中間件的無(wú)感知,甚至于動(dòng)態(tài)的切換中間件(rabbitmq切換為kafka),使得微服務(wù)開(kāi)發(fā)的高度解耦,服務(wù)可以關(guān)注更多自己的業(yè)務(wù)流程

注意:左圖是官網(wǎng)的架構(gòu)圖

Binder可以生成Binding,Binding用來(lái)綁定消息容器的生產(chǎn)者和消費(fèi)者,它有兩種類(lèi)型,INPUT和OUTPUT,INPUT對(duì)應(yīng)于消費(fèi)者,OUTPUT對(duì)應(yīng)于生產(chǎn)者。

stream為了屏蔽差異,抽象出來(lái)了一個(gè)Binder層,而目前為止,只提供了兩個(gè)框架的實(shí)現(xiàn),通過(guò)具體的實(shí)現(xiàn)來(lái)連接消息中間件。

假如想要通過(guò)stream連接RabbitMQ就使用:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

假如想要通過(guò)stream連接Kafka就使用:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Stream中的消息通信方式遵循了發(fā)布-訂閱模式,Topic主題進(jìn)行廣播,在RabbitMQ就是Exchange,在Kakfa中就是Topic。

1.3. 標(biāo)準(zhǔn)流程

  • Binder: 很方便的連接中間件,屏蔽差異
  • Channel: 通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲(chǔ)和轉(zhuǎn)發(fā)的媒介,通過(guò)Channe對(duì)隊(duì)列進(jìn)行配置
  • Source(源:發(fā)送者)和Sink(水槽:接受者): 簡(jiǎn)單的可理解為參照對(duì)象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。

1.4. 注解

注解完全是基于官方給的模型而定的!通過(guò)stream使用消息中間件也是非常簡(jiǎn)單的,直接使用以下注解就可以使用。

注意:注解依然是能用的,但是官方明確表示注解已經(jīng)被棄用,棄用并不是不能用,而是用了會(huì)畫(huà)橫杠不建議用。但是功能是沒(méi)有問(wèn)題的,低版本的cloud是沒(méi)有被棄用的。針對(duì)于注解和函數(shù)式編程兩種我都會(huì)進(jìn)行使用。

題外話:學(xué)技術(shù)永遠(yuǎn)是這樣,技術(shù)一直在不斷的更新迭代,真正學(xué)習(xí)一個(gè)技術(shù)并不是要掌握編碼使用,而是要掌握他到底是什么,能干什么,要去深入理解他,對(duì)于編碼,我認(rèn)為其實(shí)不是很重要。就算你今天掌握了官方最新用法,回頭人家又改寫(xiě)法了。

二、基于注解代碼練習(xí)

生產(chǎn)者就是消息發(fā)送者,消費(fèi)者就是消息接受者。這里我就不用kafka了,我直接用的是RabbitMQ。

windows下安裝RabbitMQ:https://blog.csdn.net/weixin_43888891/article/details/126514021

2.1. 消息驅(qū)動(dòng)之生產(chǎn)者

1.創(chuàng)建項(xiàng)目(可以是聚合可以是普通springboot項(xiàng)目)
2.添加pom

因?yàn)槭呛蚏abbitMQ整合,所以就是引入的spring-cloud-starter-stream-rabbit啟動(dòng)器

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <springboot.version>2.6.8</springboot.version>
    <springcloud.version>2021.0.3</springcloud.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-dependencies</artifactId>
           <version>${springboot.version}</version>
           <type>pom</type>
           <scope>import</scope>
       </dependency>
        <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-dependencies</artifactId>
           <version>${springcloud.version}</version>
           <type>pom</type>
           <scope>import</scope>
       </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
</dependencies>

3.添加application配置

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
        defaultRabbit: # 表示定義的名稱(chēng),用于于binding整合
          type: rabbit # 消息組件類(lèi)型
          environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務(wù)的整合處理
        output: # 這個(gè)名字是一個(gè)通道的名稱(chēng)
          destination: studyExchange # 表示要使用的Exchange名稱(chēng)定義
          content-type: application/json # 設(shè)置消息類(lèi)型,本次為json,文本則設(shè)置“text/plain”
          binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置

4.添加接口

public interface IMessageProvider {
    public String send();
}

5.添加實(shí)現(xiàn)類(lèi)

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

// 可以理解為是一個(gè)消息的發(fā)送管道的定義
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {

    // 消息的發(fā)送管道
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        // 創(chuàng)建并發(fā)送消息
        this.output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("***serial: " + serial);
        return serial;
    }
}

6.添加controller控制器

@RestController
public class SendMessageController {

    @Autowired
    private IMessageProvider iMessageProvider;

    @GetMapping("send")
    public String send() {
        return iMessageProvider.send();
    }
}

7.測(cè)試

(1)首先要保證RabbitMQ是可以訪問(wèn)的:http://localhost:15672

(2)啟動(dòng)項(xiàng)目訪問(wèn):http://localhost:8801/send

下圖波峰代表發(fā)送消息成功

啟動(dòng)后會(huì)創(chuàng)建交換機(jī),名稱(chēng)就是application.yml當(dāng)中的destination屬性設(shè)置的

注意:停止服務(wù)后并沒(méi)有刪除交換機(jī)?。?!

2.2. 消息驅(qū)動(dòng)之消費(fèi)者

1.創(chuàng)建項(xiàng)目
2.添加pom(pom和發(fā)送者依賴一模一樣)

3.添加application配置

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
        defaultRabbit: # 表示定義的名稱(chēng),用于于binding整合
          type: rabbit # 消息組件類(lèi)型
          environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務(wù)的整合處理
        input: # 這個(gè)名字是一個(gè)通道的名稱(chēng)
          destination: studyExchange # 表示要使用的Exchange名稱(chēng)定義
          content-type: application/json # 設(shè)置消息類(lèi)型,本次為對(duì)象json,如果是文本則設(shè)置“text/plain”
          binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置

4.添加監(jiān)聽(tīng)(消費(fèi)者只負(fù)責(zé)接受消息)

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消費(fèi)者1號(hào),------->接收到的消息:" + message.getPayload() + "\t port: " + serverPort);
    }
}

5.測(cè)試

(1)啟動(dòng)RabbitMQ
(2)啟動(dòng)發(fā)送消息端服務(wù)
(3)啟動(dòng)消費(fèi)者服務(wù),啟動(dòng)后會(huì)發(fā)現(xiàn),他自動(dòng)會(huì)向這個(gè)交換機(jī)當(dāng)中添加一個(gè)隊(duì)列。

發(fā)送消息:http://localhost:8801/send
接受消息:

注意:當(dāng)停止服務(wù)后消息隊(duì)列會(huì)被自動(dòng)刪除!??!

2.3. 目前存在的問(wèn)題

1.依照8802, clone出來(lái)一份運(yùn)行8803,主要用來(lái)演示多個(gè)消費(fèi)者的場(chǎng)景
2.啟動(dòng)8801生產(chǎn)者
3.啟動(dòng)8802消費(fèi)者
4.啟動(dòng)8803消費(fèi)者

當(dāng)三個(gè)服務(wù)都啟動(dòng)后通過(guò)RabbitMQ界面會(huì)發(fā)現(xiàn),一個(gè)交換機(jī)綁定了兩個(gè)隊(duì)列

運(yùn)行后會(huì)發(fā)現(xiàn)存在兩個(gè)問(wèn)題:

有重復(fù)消費(fèi)問(wèn)題消息持久化問(wèn)題

(1)重復(fù)消費(fèi)問(wèn)題:

發(fā)送消息后兩個(gè)消費(fèi)者都收到了消息:http://localhost:8801/send

比如在如下場(chǎng)景中,訂單系統(tǒng)我們做集群部署,都會(huì)從RabbitMQ中獲取訂單信息,那如果一個(gè)訂單同時(shí)被兩個(gè)服務(wù)獲取到,那么就會(huì)造成數(shù)據(jù)錯(cuò)誤,我們得避免這種情況。這時(shí)我們就可以使用Stream中的消息分組來(lái)解決

注意在Stream中處于同一個(gè)group中的多個(gè)消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。不同組是可以全面消費(fèi)的(重復(fù)消費(fèi)),同一組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi)。

(2)消息持久化問(wèn)題:

當(dāng)生產(chǎn)者發(fā)送消息的時(shí)候,消費(fèi)者恰好宕機(jī)了,但是過(guò)一會(huì)消費(fèi)者恢復(fù)了,但是消息卻沒(méi)收到。那也就是意味著消息隊(duì)列是臨時(shí)消息隊(duì)列。針對(duì)于這一點(diǎn),大家也可以測(cè)試一下,加深一下印象。

2.4. 分組解決重復(fù)消費(fèi)問(wèn)題

原理: 微服務(wù)應(yīng)用放置于同一個(gè)group中,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。同一個(gè)組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi)。

接下來(lái)直接調(diào)整兩個(gè)消費(fèi)者為同一個(gè)組:添加如下配置

當(dāng)兩個(gè)消費(fèi)者都設(shè)置好后啟動(dòng),會(huì)發(fā)現(xiàn)一個(gè)問(wèn)題: 實(shí)際上分到一個(gè)組對(duì)于RabbitMQ來(lái)說(shuō)就是兩個(gè)消費(fèi)者監(jiān)聽(tīng)了一個(gè)隊(duì)列。一個(gè)隊(duì)列那也就意味著,當(dāng)隊(duì)列收到一條消息,哪個(gè)消費(fèi)者誰(shuí)先消費(fèi)就是誰(shuí)的,消費(fèi)完隊(duì)列里面就沒(méi)有了,也就是只有一個(gè)消費(fèi)者能消費(fèi)到消息!

注意:假如不設(shè)置group屬性的時(shí)候,默認(rèn)是啟動(dòng)一個(gè)消費(fèi)者,就會(huì)創(chuàng)建一個(gè)消費(fèi)隊(duì)列,啟動(dòng)多個(gè)服務(wù)就會(huì)創(chuàng)建多個(gè)隊(duì)列。stream默認(rèn)使用的是RabbitMQ的topic交換機(jī)。當(dāng)發(fā)送者向這個(gè)交換機(jī)發(fā)送消息的時(shí)候,兩個(gè)隊(duì)列就都會(huì)接收到。關(guān)于RabbitMQ相關(guān)知識(shí)本篇不記錄,后續(xù)會(huì)專(zhuān)門(mén)寫(xiě)RabbitMQ相關(guān)文章。

最終測(cè)試:8802/8803實(shí)現(xiàn)了輪詢分組,每次只有一個(gè)消費(fèi)者8801模塊的發(fā)的消息只能被8802或8803其中一個(gè)接收到,這樣避免了重復(fù)消費(fèi)。

2.5. 消息持久化

當(dāng)三個(gè)項(xiàng)目都啟動(dòng)著的時(shí)候,現(xiàn)在我們要做幾件事:

停止8802和8803并去除掉8802的分組group: gxs,8803不去分組信息,停止掉項(xiàng)目的時(shí)候會(huì)發(fā)現(xiàn)消息隊(duì)列并沒(méi)有刪除,說(shuō)明一旦設(shè)置分組信息,消息隊(duì)列就不再是臨時(shí)隊(duì)列。

2.8801發(fā)送4條消息啟動(dòng)8802然后消息并沒(méi)有打印,沒(méi)有收到消息(注意8802是去掉分組信息的)再啟動(dòng)8803,有分組屬性配置,后臺(tái)打出來(lái)了MQ上的消息

原因就是:當(dāng)兩個(gè)項(xiàng)目都停止的時(shí)候,隊(duì)列并未刪除,而8803還綁定了這個(gè)隊(duì)列,所以他就算宕機(jī)了,又重啟了,依然可以收到消息。而8802沒(méi)有設(shè)置分組信息,他再啟動(dòng)后系統(tǒng)會(huì)給他創(chuàng)建一個(gè)臨時(shí)隊(duì)列,自然而然收不到之前的消息了。

三、函數(shù)式編程練習(xí)

官網(wǎng)介紹:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring_cloud_function

到此這篇關(guān)于Spring Cloud Stream詳解的文章就介紹到這了,更多相關(guān)Spring Cloud Stream內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java?多線程并發(fā)?ReentrantReadWriteLock詳情

    Java?多線程并發(fā)?ReentrantReadWriteLock詳情

    這篇文章主要介紹了Java多線程并發(fā)ReentrantReadWriteLock詳情,ReentrantReadWriteLock可重入讀寫(xiě)鎖。實(shí)際使用場(chǎng)景中,我們需要處理的操作本質(zhì)上是讀與寫(xiě),更多相關(guān)資料,感興趣的小伙伴可以參考一下下面文章內(nèi)容
    2022-06-06
  • Mybatis resultType返回結(jié)果為null的問(wèn)題排查方式

    Mybatis resultType返回結(jié)果為null的問(wèn)題排查方式

    這篇文章主要介紹了Mybatis resultType返回結(jié)果為null的問(wèn)題排查方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • SpringBoot之LogBack配置詳解

    SpringBoot之LogBack配置詳解

    這篇文章主要介紹了SpringBoot之LogBack配置詳解,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-02-02
  • 關(guān)于slf4j_log4j2源碼學(xué)習(xí)心得

    關(guān)于slf4j_log4j2源碼學(xué)習(xí)心得

    這篇文章主要介紹了slf4j_log4j2源碼學(xué)習(xí)心得,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • java數(shù)據(jù)庫(kù)連接池和數(shù)據(jù)庫(kù)連接示例

    java數(shù)據(jù)庫(kù)連接池和數(shù)據(jù)庫(kù)連接示例

    這篇文章主要介紹了java數(shù)據(jù)庫(kù)連接池和數(shù)據(jù)庫(kù)連接示例,需要的朋友可以參考下
    2014-05-05
  • Java開(kāi)發(fā)中的容器概念、分類(lèi)與用法深入詳解

    Java開(kāi)發(fā)中的容器概念、分類(lèi)與用法深入詳解

    這篇文章主要介紹了Java開(kāi)發(fā)中的容器概念、分類(lèi)與用法,結(jié)合實(shí)例形式較為詳細(xì)的分析了java容器的相關(guān)概念、分類(lèi)、使用方法與注意事項(xiàng),需要的朋友可以參考下
    2017-11-11
  • Mybatis-plus支持Gbase8s分頁(yè)的實(shí)現(xiàn)示例

    Mybatis-plus支持Gbase8s分頁(yè)的實(shí)現(xiàn)示例

    本文主要介紹了Mybatis-plus支持Gbase8s分頁(yè)的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • java實(shí)現(xiàn)表單必填參數(shù)驗(yàn)證的方法

    java實(shí)現(xiàn)表單必填參數(shù)驗(yàn)證的方法

    表單校驗(yàn)是很多注冊(cè)時(shí)必做的功能, 一般我們的處理都是很粗暴的寫(xiě)個(gè)if()判斷, 然后拋異常. 本文將介紹通過(guò)代理的思想, 用注解優(yōu)雅的處理非空判斷,感興趣的一起來(lái)了解一下
    2021-05-05
  • Java實(shí)現(xiàn)人機(jī)猜拳小游戲

    Java實(shí)現(xiàn)人機(jī)猜拳小游戲

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)人機(jī)猜拳小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-11-11
  • Java RocksDB安裝與應(yīng)用

    Java RocksDB安裝與應(yīng)用

    本篇文章主要給大家介紹了JAVA中RocksDB的安裝與應(yīng)用,有需要到的朋友一起學(xué)習(xí)參考下。
    2017-12-12

最新評(píng)論