Spring?Cloud?Stream消息驅(qū)動組件使用方法介紹
MQ:消息隊(duì)列/消息中間件/消息代理,產(chǎn)品有很多,ActiveMQ RabbitMQ RocketMQ Kafka
1、Stream解決的痛點(diǎn)問題
MQ消息中間件廣泛應(yīng)用在應(yīng)用解耦合、異步消息處理、流量削峰等場景中。
不同的MQ消息中間件內(nèi)部機(jī)制包括使用方式都會有所不同,比如RabbitMQ中有Exchange(交換機(jī)/交換器)這一概念,kafka有Topic、Partition分區(qū)這些概念,MQ消息中間件的差異性不利于我們上層的開發(fā)應(yīng)用,當(dāng)我們的系統(tǒng)希望從原有的RabbitMQ切換到Kafka時,我們會發(fā)現(xiàn)比較困難,很多要操作可能重來(因?yàn)閼?yīng)用程序和具體的某?款MQ消息中間件耦合在?起了)。
Spring Cloud Stream進(jìn)行了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細(xì)節(jié)差異,就像Hibernate屏蔽掉了具體數(shù)據(jù)庫(Mysql/Oracle一樣)。如此?來,我們學(xué)習(xí)、開發(fā)、維護(hù)MQ都會變得輕松。目前Spring Cloud Stream支持RabbitMQ和Kafka。
本質(zhì):屏蔽掉了底層不同MQ消息中間件之間的差異,統(tǒng)一了MQ的編程模型,降低了學(xué)習(xí)、開發(fā)、維護(hù)MQ的成本
2、Stream重要概念
Spring Cloud Stream 是?個構(gòu)建消息驅(qū)動微服務(wù)的框架。應(yīng)用程序通過inputs(相當(dāng)于消息消費(fèi)者consumer)或者outputs(相當(dāng)于消息生產(chǎn)者producer)來與Spring Cloud Stream中的binder對象交互,而Binder對象是用來屏蔽底層MQ細(xì)節(jié)的,它負(fù)責(zé)與具體的消息中間件交互。
說白了:對于我們來說,只需要知道如何使?Spring Cloud Stream與Binder對象交互即可
Binder綁定器
Binder綁定器是Spring Cloud Stream 中非常核心的概念,就是通過它來屏蔽底層不同MQ消息中間件的細(xì)節(jié)差異,當(dāng)需要更換為其他消息中間件時,我們需要做的就是更換對應(yīng)的Binder綁定器而不需要修改任何應(yīng)用邏輯(Binder綁定器的實(shí)現(xiàn)是框架內(nèi)置的,Spring Cloud Stream目前支持Rabbit、Kafka兩種消息隊(duì)列)
3、傳統(tǒng)MQ模型與Stream消息驅(qū)動模型
4、Stream消息通信方式及編程模型
4.1、Stream消息通信方式
Stream中的消息通信方式遵循了發(fā)布—訂閱模式。
在Spring Cloud Stream中的消息通信方式遵循了發(fā)布-訂閱模式,當(dāng)一條消息被投遞到消息中間件之 后,它會通過共享的 Topic 主題進(jìn)行廣播,消息消費(fèi)者在訂閱的主題中收到它并觸發(fā)自身的業(yè)務(wù)邏輯處理。這里所提到的 Topic 主題是SpringCloud Stream中的一個抽象概念,用來代表發(fā)布共享消息給消 費(fèi)者的地方。在不同的消息中間件中, Topic 可能對應(yīng)著不同的概念,比如:在RabbitMQ中的它對應(yīng)了Exchange、在Kakfa中則對應(yīng)了Kafka中的Topic。
4.2、Stream編程注解
如下的注解無非在做一件事,把我們結(jié)構(gòu)圖中那些組成部分上下關(guān)聯(lián)起來,打通通道(這樣的話生產(chǎn)者的message數(shù)據(jù)才能進(jìn)入mq,mq中數(shù)據(jù)才能進(jìn)入消費(fèi)者工程)。
注解 | 描述 |
@Input(在消費(fèi)者工程中使用) | 注解標(biāo)識輸入通道,通過該輸入通道接收到的消息進(jìn)入應(yīng)用程序 |
@Output(在生產(chǎn)者工程中使用) | 注解標(biāo)識輸出通道,發(fā)布的消息將通過該通道離開應(yīng)用程序 |
@StreamListener(在消費(fèi)者工程中使用,監(jiān)聽message的到來) | 監(jiān)聽隊(duì)列,用于消費(fèi)者的隊(duì)列的消息的接收(有消息監(jiān)聽.....) |
@EnableBinding | 把Channel和Exchange(對于RabbitMQ)綁定在一起 |
4.3、Stream消息驅(qū)動之開發(fā)生產(chǎn)者端
(1)在lagou_parent下新建子module:lagou-cloud-stream-producer-9090
(2)pom.xml中添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lagou-parent</artifactId> <groupId>com.lagou.edu</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>lagou-cloud-stream-producer-9090</artifactId> <dependencies> <!--eureka client 客戶端依賴引入--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--spring cloud stream 依賴(rabbit)--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> </project>
(3)application.yml添加配置
server:
port: 9090
spring:
application:
name: lagou-cloud-stream-producer
cloud:
stream:
binders: # 綁定MQ服務(wù)信息(此處我們是RabbitMQ)
lagouRabbitBinder: # 給Binder定義的名稱,用于后面的關(guān)聯(lián)
type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
environment: # MQ環(huán)境配置(用戶名、密碼等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 關(guān)聯(lián)整合通道和binder對象
output: # output是我們定義的通道名稱,此處不能亂改
destination: lagouExchange # 要使用的Exchange名稱(消息隊(duì)列主題名稱)
content-type: text/plain # application/json # 消息類型設(shè)置,比如json
binder: lagouRabbitBinder # 關(guān)聯(lián)MQ服務(wù)
eureka:
client:
serviceUrl: # eureka server的路徑
defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填寫了進(jìn)來,也可以只寫一臺,因?yàn)楦鱾€ eureka server 可以同步注冊表
instance:
prefer-ip-address: true #使用ip注冊
(4)啟動類
package com.lagou.edu; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @SpringBootApplication @EnableDiscoveryClient public class StreamProducerApplication9090 { public static void main(String[] args) { SpringApplication.run(StreamProducerApplication9090.class, args); } }
(5)業(yè)務(wù)類開發(fā)(發(fā)送消息接口、接口實(shí)現(xiàn)類)
接口
package com.lagou.edu.service; public interface IMessageProducer { public void sendMessage(String content); }
實(shí)現(xiàn)類
package com.lagou.edu.service.impl; import com.lagou.edu.service.IMessageProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; // Source.class里面就是對輸出通道的定義(這是Spring Cloud Stream內(nèi)置的通道封裝) @EnableBinding(Source.class) public class MessageProducerImpl implements IMessageProducer { // 將MessageChannel的封裝對象Source注入到這里使用 @Autowired private Source source; @Override public void sendMessage(String content) { // 向mq中發(fā)送消息(并不是直接操作mq,應(yīng)該操作的是spring cloud stream) // 使用通道向外發(fā)出消息(指的是Source里面的output通道) source.output().send(MessageBuilder.withPayload(content).build()); } }
測試類
import com.lagou.edu.StreamProducerApplication9090; import com.lagou.edu.service.IMessageProducer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @SpringBootTest(classes = {StreamProducerApplication9090.class}) @RunWith(SpringJUnit4ClassRunner.class) public class MessageProducerTest { @Autowired private IMessageProducer iMessageProducer; @Test public void testSendMessage() { iMessageProducer.sendMessage("hello world-lagou101"); } }
4.4、Stream消息驅(qū)動之開發(fā)消費(fèi)者端
注:消費(fèi)端引入的jar與服務(wù)端引入的jar包相同,故省略。
(1)application.yml
server:
port: 9091
spring:
application:
name: lagou-cloud-stream-consumer
cloud:
stream:
binders: # 綁定MQ服務(wù)信息(此處我們是RabbitMQ)
lagouRabbitBinder: # 給Binder定義的名稱,用于后面的關(guān)聯(lián)
type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
environment: # MQ環(huán)境配置(用戶名、密碼等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 關(guān)聯(lián)整合通道和binder對象
input: # input是我們定義的通道名稱,此處不能亂改
destination: lagouExchange # 要使用的Exchange名稱(消息隊(duì)列主題名稱)
content-type: text/plain # application/json # 消息類型設(shè)置,比如json
binder: lagouRabbitBinder # 關(guān)聯(lián)MQ服務(wù)
group: lagou001 # 分組
eureka:
client:
serviceUrl: # eureka server的路徑
defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填寫了進(jìn)來,也可以只寫一臺,因?yàn)楦鱾€ eureka server 可以同步注冊表
instance:
prefer-ip-address: true #使用ip注冊
(2)消息消費(fèi)者監(jiān)聽類
package com.lagou.edu.service; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; @EnableBinding(Sink.class) public class MessageConsumerService { @StreamListener(Sink.INPUT) public void recevieMessages(Message<String> message) { System.out.println("=========接收到的消息:" + message); } }
(3)啟動類
package com.lagou.edu; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @SpringBootApplication @EnableDiscoveryClient public class StreamConsumerApplication9091 { public static void main(String[] args) { SpringApplication.run(StreamConsumerApplication9091.class,args); } }
至此,消費(fèi)端與服務(wù)端代碼編寫完成。
5、Stream高級之自定義消息通道
Stream 內(nèi)置了兩種接口Source和Sink分別定義了 binding 為 “input” 的輸入流和“output” 的輸出流,我們也可以自定義各種輸入輸出流(通道),但實(shí)際我們可以在我們的服務(wù)中使用多個binder、多個輸入通道和輸出通道,然而默認(rèn)就帶了一個input的輸入通道和一個output的輸出通道,怎么辦?
我們是可以自定義消息通道的,學(xué)著Source和Sink的樣子,給你的通道定義個自己的名字,多個輸入通道和輸出通道是可以寫在一個類中的。
定義接口
interface CustomChannel { String INPUT_LOG = "inputLog"; String OUTPUT_LOG = "outputLog"; @Input(INPUT_LOG) SubscribableChannel inputLog(); @Output(OUTPUT_LOG) MessageChannel outputLog(); }
如何使用?
在 @EnableBinding 注解中,綁定自定義的接口
使用@StreamListener 做監(jiān)聽的時候,需要指定 CustomChannel.INPUT_LOG
bindings:
inputLog:
destination: lagouExchange
outputLog:
destination: eduExchange
6、Stream高級之消息分組
如上我們的情況,消費(fèi)者端有兩個(消費(fèi)同一個MQ的同一個主題),但是呢我們的業(yè)務(wù)場景中希望這個主題的一個Message只能被一個消費(fèi)者端消費(fèi)處理,此時我們就可以使用消息分組。
解決的問題:能解決消息重復(fù)消費(fèi)問題
我們僅僅需要在服務(wù)消費(fèi)者端設(shè)置 spring.cloud.stream.bindings.input.group 屬性,多個消費(fèi)者實(shí)例配置為同一個group名稱(在同一個group中的多個消費(fèi)者只有一個可以獲取到消息并消費(fèi))。
到此這篇關(guān)于Spring Cloud Stream消息驅(qū)動組件使用方法介紹的文章就介紹到這了,更多相關(guān)Spring Cloud Stream消息驅(qū)動內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- spring-integration連接MQTT全過程
- Spring?Integration概述與怎么使用詳解
- 如何使用Spring?integration在Springboot中集成Mqtt詳解
- 源碼解讀Spring-Integration執(zhí)行過程
- 最新SpringCloud?Stream消息驅(qū)動講解
- Springcloud Stream消息驅(qū)動工具使用介紹
- Springcloud整合stream,rabbitmq實(shí)現(xiàn)消息驅(qū)動功能
- SpringCloud Stream消息驅(qū)動實(shí)例詳解
- Spring Integration 實(shí)現(xiàn)消息驅(qū)動的詳細(xì)步驟
相關(guān)文章
java實(shí)現(xiàn)建造者模式(Builder Pattern)
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)建造者模式Builder Pattern,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-10-10Spring cloud oauth2如何搭建認(rèn)證資源中心
這篇文章主要介紹了Spring cloud oauth2如何搭建認(rèn)證資源中心,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11如何在Springboot實(shí)現(xiàn)攔截器功能
其實(shí)spring boot攔截器的配置方式和springMVC差不多,只有一些小的改變需要注意下就ok了,下面這篇文章主要給大家介紹了關(guān)于如何在Springboot實(shí)現(xiàn)攔截器功能的相關(guān)資料,需要的朋友可以參考下2022-06-06JAVA中通過Hibernate-Validation進(jìn)行參數(shù)驗(yàn)證
這篇文章主要介紹了JAVA中通過Hibernate-Validation進(jìn)行參數(shù)驗(yàn)證,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04Java微信公眾平臺開發(fā)(15) 微信JSSDK的使用
這篇文章主要為大家詳細(xì)介紹了Java微信公眾平臺開發(fā)第十五步,微信JSSDK的使用方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04