Spring?Cloud?Stream消息驅動組件使用方法介紹
MQ:消息隊列/消息中間件/消息代理,產品有很多,ActiveMQ RabbitMQ RocketMQ Kafka
1、Stream解決的痛點問題
MQ消息中間件廣泛應用在應用解耦合、異步消息處理、流量削峰等場景中。
不同的MQ消息中間件內部機制包括使用方式都會有所不同,比如RabbitMQ中有Exchange(交換機/交換器)這一概念,kafka有Topic、Partition分區(qū)這些概念,MQ消息中間件的差異性不利于我們上層的開發(fā)應用,當我們的系統(tǒng)希望從原有的RabbitMQ切換到Kafka時,我們會發(fā)現(xiàn)比較困難,很多要操作可能重來(因為應用程序和具體的某?款MQ消息中間件耦合在?起了)。
Spring Cloud Stream進行了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節(jié)差異,就像Hibernate屏蔽掉了具體數(shù)據(jù)庫(Mysql/Oracle一樣)。如此?來,我們學習、開發(fā)、維護MQ都會變得輕松。目前Spring Cloud Stream支持RabbitMQ和Kafka。
本質:屏蔽掉了底層不同MQ消息中間件之間的差異,統(tǒng)一了MQ的編程模型,降低了學習、開發(fā)、維護MQ的成本
2、Stream重要概念
Spring Cloud Stream 是?個構建消息驅動微服務的框架。應用程序通過inputs(相當于消息消費者consumer)或者outputs(相當于消息生產者producer)來與Spring Cloud Stream中的binder對象交互,而Binder對象是用來屏蔽底層MQ細節(jié)的,它負責與具體的消息中間件交互。
說白了:對于我們來說,只需要知道如何使?Spring Cloud Stream與Binder對象交互即可


Binder綁定器
Binder綁定器是Spring Cloud Stream 中非常核心的概念,就是通過它來屏蔽底層不同MQ消息中間件的細節(jié)差異,當需要更換為其他消息中間件時,我們需要做的就是更換對應的Binder綁定器而不需要修改任何應用邏輯(Binder綁定器的實現(xiàn)是框架內置的,Spring Cloud Stream目前支持Rabbit、Kafka兩種消息隊列)
3、傳統(tǒng)MQ模型與Stream消息驅動模型

4、Stream消息通信方式及編程模型
4.1、Stream消息通信方式
Stream中的消息通信方式遵循了發(fā)布—訂閱模式。
在Spring Cloud Stream中的消息通信方式遵循了發(fā)布-訂閱模式,當一條消息被投遞到消息中間件之 后,它會通過共享的 Topic 主題進行廣播,消息消費者在訂閱的主題中收到它并觸發(fā)自身的業(yè)務邏輯處理。這里所提到的 Topic 主題是SpringCloud Stream中的一個抽象概念,用來代表發(fā)布共享消息給消 費者的地方。在不同的消息中間件中, Topic 可能對應著不同的概念,比如:在RabbitMQ中的它對應了Exchange、在Kakfa中則對應了Kafka中的Topic。
4.2、Stream編程注解
如下的注解無非在做一件事,把我們結構圖中那些組成部分上下關聯(lián)起來,打通通道(這樣的話生產者的message數(shù)據(jù)才能進入mq,mq中數(shù)據(jù)才能進入消費者工程)。
| 注解 | 描述 |
| @Input(在消費者工程中使用) | 注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序 |
| @Output(在生產者工程中使用) | 注解標識輸出通道,發(fā)布的消息將通過該通道離開應用程序 |
| @StreamListener(在消費者工程中使用,監(jiān)聽message的到來) | 監(jiān)聽隊列,用于消費者的隊列的消息的接收(有消息監(jiān)聽.....) |
| @EnableBinding | 把Channel和Exchange(對于RabbitMQ)綁定在一起 |
4.3、Stream消息驅動之開發(fā)生產者端
(1)在lagou_parent下新建子module:lagou-cloud-stream-producer-9090
(2)pom.xml中添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lagou-parent</artifactId>
<groupId>com.lagou.edu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>lagou-cloud-stream-producer-9090</artifactId>
<dependencies>
<!--eureka client 客戶端依賴引入-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--spring cloud stream 依賴(rabbit)-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
</project>(3)application.yml添加配置
server:
port: 9090
spring:
application:
name: lagou-cloud-stream-producer
cloud:
stream:
binders: # 綁定MQ服務信息(此處我們是RabbitMQ)
lagouRabbitBinder: # 給Binder定義的名稱,用于后面的關聯(lián)
type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
environment: # MQ環(huán)境配置(用戶名、密碼等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 關聯(lián)整合通道和binder對象
output: # output是我們定義的通道名稱,此處不能亂改
destination: lagouExchange # 要使用的Exchange名稱(消息隊列主題名稱)
content-type: text/plain # application/json # 消息類型設置,比如json
binder: lagouRabbitBinder # 關聯(lián)MQ服務
eureka:
client:
serviceUrl: # eureka server的路徑
defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填寫了進來,也可以只寫一臺,因為各個 eureka server 可以同步注冊表
instance:
prefer-ip-address: true #使用ip注冊
(4)啟動類
package com.lagou.edu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class StreamProducerApplication9090 {
public static void main(String[] args) {
SpringApplication.run(StreamProducerApplication9090.class, args);
}
}(5)業(yè)務類開發(fā)(發(fā)送消息接口、接口實現(xiàn)類)
接口
package com.lagou.edu.service;
public interface IMessageProducer {
public void sendMessage(String content);
}實現(xiàn)類
package com.lagou.edu.service.impl;
import com.lagou.edu.service.IMessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
// Source.class里面就是對輸出通道的定義(這是Spring Cloud Stream內置的通道封裝)
@EnableBinding(Source.class)
public class MessageProducerImpl implements IMessageProducer {
// 將MessageChannel的封裝對象Source注入到這里使用
@Autowired
private Source source;
@Override
public void sendMessage(String content) {
// 向mq中發(fā)送消息(并不是直接操作mq,應該操作的是spring cloud stream)
// 使用通道向外發(fā)出消息(指的是Source里面的output通道)
source.output().send(MessageBuilder.withPayload(content).build());
}
}測試類
import com.lagou.edu.StreamProducerApplication9090;
import com.lagou.edu.service.IMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest(classes = {StreamProducerApplication9090.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageProducerTest {
@Autowired
private IMessageProducer iMessageProducer;
@Test
public void testSendMessage() {
iMessageProducer.sendMessage("hello world-lagou101");
}
}4.4、Stream消息驅動之開發(fā)消費者端
注:消費端引入的jar與服務端引入的jar包相同,故省略。
(1)application.yml
server:
port: 9091
spring:
application:
name: lagou-cloud-stream-consumer
cloud:
stream:
binders: # 綁定MQ服務信息(此處我們是RabbitMQ)
lagouRabbitBinder: # 給Binder定義的名稱,用于后面的關聯(lián)
type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
environment: # MQ環(huán)境配置(用戶名、密碼等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 關聯(lián)整合通道和binder對象
input: # input是我們定義的通道名稱,此處不能亂改
destination: lagouExchange # 要使用的Exchange名稱(消息隊列主題名稱)
content-type: text/plain # application/json # 消息類型設置,比如json
binder: lagouRabbitBinder # 關聯(lián)MQ服務
group: lagou001 # 分組
eureka:
client:
serviceUrl: # eureka server的路徑
defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填寫了進來,也可以只寫一臺,因為各個 eureka server 可以同步注冊表
instance:
prefer-ip-address: true #使用ip注冊
(2)消息消費者監(jiān)聽類
package com.lagou.edu.service;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
@EnableBinding(Sink.class)
public class MessageConsumerService {
@StreamListener(Sink.INPUT)
public void recevieMessages(Message<String> message) {
System.out.println("=========接收到的消息:" + message);
}
}(3)啟動類
package com.lagou.edu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class StreamConsumerApplication9091 {
public static void main(String[] args) {
SpringApplication.run(StreamConsumerApplication9091.class,args);
}
}至此,消費端與服務端代碼編寫完成。
5、Stream高級之自定義消息通道
Stream 內置了兩種接口Source和Sink分別定義了 binding 為 “input” 的輸入流和“output” 的輸出流,我們也可以自定義各種輸入輸出流(通道),但實際我們可以在我們的服務中使用多個binder、多個輸入通道和輸出通道,然而默認就帶了一個input的輸入通道和一個output的輸出通道,怎么辦?
我們是可以自定義消息通道的,學著Source和Sink的樣子,給你的通道定義個自己的名字,多個輸入通道和輸出通道是可以寫在一個類中的。
定義接口
interface CustomChannel {
String INPUT_LOG = "inputLog";
String OUTPUT_LOG = "outputLog";
@Input(INPUT_LOG)
SubscribableChannel inputLog();
@Output(OUTPUT_LOG)
MessageChannel outputLog();
}如何使用?
在 @EnableBinding 注解中,綁定自定義的接口
使用@StreamListener 做監(jiān)聽的時候,需要指定 CustomChannel.INPUT_LOG
bindings:
inputLog:
destination: lagouExchange
outputLog:
destination: eduExchange
6、Stream高級之消息分組
如上我們的情況,消費者端有兩個(消費同一個MQ的同一個主題),但是呢我們的業(yè)務場景中希望這個主題的一個Message只能被一個消費者端消費處理,此時我們就可以使用消息分組。
解決的問題:能解決消息重復消費問題
我們僅僅需要在服務消費者端設置 spring.cloud.stream.bindings.input.group 屬性,多個消費者實例配置為同一個group名稱(在同一個group中的多個消費者只有一個可以獲取到消息并消費)。

到此這篇關于Spring Cloud Stream消息驅動組件使用方法介紹的文章就介紹到這了,更多相關Spring Cloud Stream消息驅動內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java實現(xiàn)建造者模式(Builder Pattern)
這篇文章主要為大家詳細介紹了java實現(xiàn)建造者模式Builder Pattern,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-10-10
JAVA中通過Hibernate-Validation進行參數(shù)驗證
這篇文章主要介紹了JAVA中通過Hibernate-Validation進行參數(shù)驗證,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-04-04
Java微信公眾平臺開發(fā)(15) 微信JSSDK的使用
這篇文章主要為大家詳細介紹了Java微信公眾平臺開發(fā)第十五步,微信JSSDK的使用方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04

