欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring?Cloud?Stream消息驅(qū)動組件使用方法介紹

 更新時間:2022年09月01日 08:38:57   作者:悠然予夏  
Spring?Cloud?Stream?消息驅(qū)動組件幫助我們更快速,更方便,更友好的去構(gòu)建消息驅(qū)動微服務(wù)的。當(dāng)時定時任務(wù)和消息驅(qū)動的?個對比。消息驅(qū)動:基于消息機(jī)制做一些事情

MQ:消息隊(duì)列/消息中間件/消息代理,產(chǎn)品有很多,ActiveMQ RabbitMQ RocketMQ Kafka

1、Stream解決的痛點(diǎn)問題

MQ消息中間件廣泛應(yīng)用在應(yīng)用解耦合、異步消息處理、流量削峰等場景中。

不同的MQ消息中間件內(nèi)部機(jī)制包括使用方式都會有所不同,比如RabbitMQ中有Exchange(交換機(jī)/交換器)這一概念,kafka有Topic、Partition分區(qū)這些概念,MQ消息中間件的差異性不利于我們上層的開發(fā)應(yīng)用,當(dāng)我們的系統(tǒng)希望從原有的RabbitMQ切換到Kafka時,我們會發(fā)現(xiàn)比較困難,很多要操作可能重來(因?yàn)閼?yīng)用程序和具體的某?款MQ消息中間件耦合在?起了)。

Spring Cloud Stream進(jìn)行了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細(xì)節(jié)差異,就像Hibernate屏蔽掉了具體數(shù)據(jù)庫(Mysql/Oracle一樣)。如此?來,我們學(xué)習(xí)、開發(fā)、維護(hù)MQ都會變得輕松。目前Spring Cloud Stream支持RabbitMQ和Kafka。

本質(zhì):屏蔽掉了底層不同MQ消息中間件之間的差異,統(tǒng)一了MQ的編程模型,降低了學(xué)習(xí)、開發(fā)、維護(hù)MQ的成本

2、Stream重要概念

Spring Cloud Stream 是?個構(gòu)建消息驅(qū)動微服務(wù)的框架。應(yīng)用程序通過inputs(相當(dāng)于消息消費(fèi)者consumer)或者outputs(相當(dāng)于消息生產(chǎn)者producer)來與Spring Cloud Stream中的binder對象交互,而Binder對象是用來屏蔽底層MQ細(xì)節(jié)的,它負(fù)責(zé)與具體的消息中間件交互。

說白了:對于我們來說,只需要知道如何使?Spring Cloud Stream與Binder對象交互即可

Binder綁定器

Binder綁定器是Spring Cloud Stream 中非常核心的概念,就是通過它來屏蔽底層不同MQ消息中間件的細(xì)節(jié)差異,當(dāng)需要更換為其他消息中間件時,我們需要做的就是更換對應(yīng)的Binder綁定器而不需要修改任何應(yīng)用邏輯(Binder綁定器的實(shí)現(xiàn)是框架內(nèi)置的,Spring Cloud Stream目前支持Rabbit、Kafka兩種消息隊(duì)列)

3、傳統(tǒng)MQ模型與Stream消息驅(qū)動模型

4、Stream消息通信方式及編程模型

4.1、Stream消息通信方式

Stream中的消息通信方式遵循了發(fā)布—訂閱模式。

在Spring Cloud Stream中的消息通信方式遵循了發(fā)布-訂閱模式,當(dāng)一條消息被投遞到消息中間件之 后,它會通過共享的 Topic 主題進(jìn)行廣播,消息消費(fèi)者在訂閱的主題中收到它并觸發(fā)自身的業(yè)務(wù)邏輯處理。這里所提到的 Topic 主題是SpringCloud Stream中的一個抽象概念,用來代表發(fā)布共享消息給消 費(fèi)者的地方。在不同的消息中間件中, Topic 可能對應(yīng)著不同的概念,比如:在RabbitMQ中的它對應(yīng)了Exchange、在Kakfa中則對應(yīng)了Kafka中的Topic。

4.2、Stream編程注解

如下的注解無非在做一件事,把我們結(jié)構(gòu)圖中那些組成部分上下關(guān)聯(lián)起來,打通通道(這樣的話生產(chǎn)者的message數(shù)據(jù)才能進(jìn)入mq,mq中數(shù)據(jù)才能進(jìn)入消費(fèi)者工程)。

注解描述
@Input(在消費(fèi)者工程中使用)注解標(biāo)識輸入通道,通過該輸入通道接收到的消息進(jìn)入應(yīng)用程序
@Output(在生產(chǎn)者工程中使用)注解標(biāo)識輸出通道,發(fā)布的消息將通過該通道離開應(yīng)用程序
@StreamListener(在消費(fèi)者工程中使用,監(jiān)聽message的到來)監(jiān)聽隊(duì)列,用于消費(fèi)者的隊(duì)列的消息的接收(有消息監(jiān)聽.....)
@EnableBinding把Channel和Exchange(對于RabbitMQ)綁定在一起

4.3、Stream消息驅(qū)動之開發(fā)生產(chǎn)者端

(1)在lagou_parent下新建子module:lagou-cloud-stream-producer-9090

(2)pom.xml中添加依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lagou-parent</artifactId>
        <groupId>com.lagou.edu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>lagou-cloud-stream-producer-9090</artifactId>
    <dependencies>
        <!--eureka client 客戶端依賴引入-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--spring cloud stream 依賴(rabbit)-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
</project>

(3)application.yml添加配置

server:
  port: 9090
spring:
  application:
    name: lagou-cloud-stream-producer
  cloud:
    stream:
      binders: # 綁定MQ服務(wù)信息(此處我們是RabbitMQ)
        lagouRabbitBinder: # 給Binder定義的名稱,用于后面的關(guān)聯(lián)
          type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
          environment: # MQ環(huán)境配置(用戶名、密碼等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 關(guān)聯(lián)整合通道和binder對象
        output: # output是我們定義的通道名稱,此處不能亂改
          destination: lagouExchange # 要使用的Exchange名稱(消息隊(duì)列主題名稱)
          content-type: text/plain # application/json # 消息類型設(shè)置,比如json
          binder: lagouRabbitBinder # 關(guān)聯(lián)MQ服務(wù)
eureka:
  client:
    serviceUrl: # eureka server的路徑
      defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填寫了進(jìn)來,也可以只寫一臺,因?yàn)楦鱾€ eureka server 可以同步注冊表
    instance:
      prefer-ip-address: true #使用ip注冊

(4)啟動類

package com.lagou.edu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class StreamProducerApplication9090 {
    public static void main(String[] args) {
        SpringApplication.run(StreamProducerApplication9090.class, args);
    }
}

(5)業(yè)務(wù)類開發(fā)(發(fā)送消息接口、接口實(shí)現(xiàn)類)

接口

package com.lagou.edu.service;
public interface IMessageProducer {
    public void sendMessage(String content);
}

實(shí)現(xiàn)類

package com.lagou.edu.service.impl;
import com.lagou.edu.service.IMessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
// Source.class里面就是對輸出通道的定義(這是Spring Cloud Stream內(nèi)置的通道封裝)
@EnableBinding(Source.class)
public class MessageProducerImpl implements IMessageProducer {
    // 將MessageChannel的封裝對象Source注入到這里使用
    @Autowired
    private Source source;
    @Override
    public void sendMessage(String content) {
        // 向mq中發(fā)送消息(并不是直接操作mq,應(yīng)該操作的是spring cloud stream)
        // 使用通道向外發(fā)出消息(指的是Source里面的output通道)
        source.output().send(MessageBuilder.withPayload(content).build());
    }
}

測試類

import com.lagou.edu.StreamProducerApplication9090;
import com.lagou.edu.service.IMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest(classes = {StreamProducerApplication9090.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageProducerTest {
    @Autowired
    private IMessageProducer iMessageProducer;
    @Test
    public void testSendMessage() {
        iMessageProducer.sendMessage("hello world-lagou101");
    }
}

4.4、Stream消息驅(qū)動之開發(fā)消費(fèi)者端

注:消費(fèi)端引入的jar與服務(wù)端引入的jar包相同,故省略。

(1)application.yml

server:
  port: 9091
spring:
  application:
    name: lagou-cloud-stream-consumer
  cloud:
    stream:
      binders: # 綁定MQ服務(wù)信息(此處我們是RabbitMQ)
        lagouRabbitBinder: # 給Binder定義的名稱,用于后面的關(guān)聯(lián)
          type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
          environment: # MQ環(huán)境配置(用戶名、密碼等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 關(guān)聯(lián)整合通道和binder對象
        input: # input是我們定義的通道名稱,此處不能亂改
          destination: lagouExchange # 要使用的Exchange名稱(消息隊(duì)列主題名稱)
          content-type: text/plain # application/json # 消息類型設(shè)置,比如json
          binder: lagouRabbitBinder # 關(guān)聯(lián)MQ服務(wù)
          group: lagou001 # 分組
eureka:
  client:
    serviceUrl: # eureka server的路徑
      defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填寫了進(jìn)來,也可以只寫一臺,因?yàn)楦鱾€ eureka server 可以同步注冊表
    instance:
      prefer-ip-address: true #使用ip注冊

(2)消息消費(fèi)者監(jiān)聽類

package com.lagou.edu.service;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
@EnableBinding(Sink.class)
public class MessageConsumerService {
    @StreamListener(Sink.INPUT)
    public void recevieMessages(Message<String> message) {
        System.out.println("=========接收到的消息:" + message);
    }
}

(3)啟動類

package com.lagou.edu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class StreamConsumerApplication9091 {
    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication9091.class,args);
    }
}

至此,消費(fèi)端與服務(wù)端代碼編寫完成。

5、Stream高級之自定義消息通道

Stream 內(nèi)置了兩種接口Source和Sink分別定義了 binding 為 “input” 的輸入流和“output” 的輸出流,我們也可以自定義各種輸入輸出流(通道),但實(shí)際我們可以在我們的服務(wù)中使用多個binder、多個輸入通道和輸出通道,然而默認(rèn)就帶了一個input的輸入通道和一個output的輸出通道,怎么辦?

我們是可以自定義消息通道的,學(xué)著Source和Sink的樣子,給你的通道定義個自己的名字,多個輸入通道和輸出通道是可以寫在一個類中的。

定義接口

interface CustomChannel {
    String INPUT_LOG = "inputLog";
    String OUTPUT_LOG = "outputLog";
    @Input(INPUT_LOG)
        SubscribableChannel inputLog();
    @Output(OUTPUT_LOG)
        MessageChannel outputLog();
}

如何使用?

在 @EnableBinding 注解中,綁定自定義的接口

使用@StreamListener 做監(jiān)聽的時候,需要指定 CustomChannel.INPUT_LOG

bindings:
        inputLog:
                destination: lagouExchange
        outputLog:
                destination: eduExchange

6、Stream高級之消息分組

如上我們的情況,消費(fèi)者端有兩個(消費(fèi)同一個MQ的同一個主題),但是呢我們的業(yè)務(wù)場景中希望這個主題的一個Message只能被一個消費(fèi)者端消費(fèi)處理,此時我們就可以使用消息分組。

解決的問題:能解決消息重復(fù)消費(fèi)問題

我們僅僅需要在服務(wù)消費(fèi)者端設(shè)置 spring.cloud.stream.bindings.input.group 屬性,多個消費(fèi)者實(shí)例配置為同一個group名稱(在同一個group中的多個消費(fèi)者只有一個可以獲取到消息并消費(fèi))。

到此這篇關(guān)于Spring Cloud Stream消息驅(qū)動組件使用方法介紹的文章就介紹到這了,更多相關(guān)Spring Cloud Stream消息驅(qū)動內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java字符串的壓縮與解壓縮的兩種方法

    Java字符串的壓縮與解壓縮的兩種方法

    這篇文章主要介紹了Java字符串的壓縮與解壓縮的兩種方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • java實(shí)現(xiàn)建造者模式(Builder Pattern)

    java實(shí)現(xiàn)建造者模式(Builder Pattern)

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)建造者模式Builder Pattern,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-10-10
  • SpringBoot統(tǒng)一返回結(jié)果問題

    SpringBoot統(tǒng)一返回結(jié)果問題

    這篇文章主要介紹了SpringBoot統(tǒng)一返回結(jié)果問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • Zookeeper ZkClient使用介紹

    Zookeeper ZkClient使用介紹

    ZkClient是Github上?個開源的zookeeper客戶端,在Zookeeper原生API接口之上進(jìn)行了包裝,是?個更易用的Zookeeper客戶端,同時,zkClient在內(nèi)部還實(shí)現(xiàn)了諸如Session超時重連、Watcher反復(fù)注冊等功能
    2022-09-09
  • Spring cloud oauth2如何搭建認(rèn)證資源中心

    Spring cloud oauth2如何搭建認(rèn)證資源中心

    這篇文章主要介紹了Spring cloud oauth2如何搭建認(rèn)證資源中心,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-11-11
  • 如何在Springboot實(shí)現(xiàn)攔截器功能

    如何在Springboot實(shí)現(xiàn)攔截器功能

    其實(shí)spring boot攔截器的配置方式和springMVC差不多,只有一些小的改變需要注意下就ok了,下面這篇文章主要給大家介紹了關(guān)于如何在Springboot實(shí)現(xiàn)攔截器功能的相關(guān)資料,需要的朋友可以參考下
    2022-06-06
  • jsoup?框架的使用小結(jié)

    jsoup?框架的使用小結(jié)

    jsoup 是一款基于Java的HTML解析器,它提供了一套非常省力的API,不但能直接解析某個URL地址、HTML文本內(nèi)容,而且還能通過類似于 DOM、CSS 或者jQuery的方法來操作數(shù)據(jù),所以jsoup也可以被當(dāng)做爬蟲工具使用,這篇文章主要介紹了jsoup使用,需要的朋友可以參考下
    2023-04-04
  • JAVA中通過Hibernate-Validation進(jìn)行參數(shù)驗(yàn)證

    JAVA中通過Hibernate-Validation進(jìn)行參數(shù)驗(yàn)證

    這篇文章主要介紹了JAVA中通過Hibernate-Validation進(jìn)行參數(shù)驗(yàn)證,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-04-04
  • Java微信公眾平臺開發(fā)(15) 微信JSSDK的使用

    Java微信公眾平臺開發(fā)(15) 微信JSSDK的使用

    這篇文章主要為大家詳細(xì)介紹了Java微信公眾平臺開發(fā)第十五步,微信JSSDK的使用方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • Java中的StringJoiner類使用示例深入詳解

    Java中的StringJoiner類使用示例深入詳解

    這篇文章主要為大家介紹了Java中的StringJoiner類使用示例深入詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-09-09

最新評論