Spring Cloud Stream如何實(shí)現(xiàn)服務(wù)之間的通訊
Spring Cloud Stream
Srping cloud Bus的底層實(shí)現(xiàn)就是Spring Cloud Stream,Spring Cloud Stream的目的是用于構(gòu)建基于消息驅(qū)動(dòng)(或事件驅(qū)動(dòng))的微服務(wù)架構(gòu)。Spring Cloud Stream本身對(duì)Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模塊進(jìn)行封裝(整合)和擴(kuò)展,下面我們實(shí)現(xiàn)兩個(gè)服務(wù)之間的通訊來(lái)演示Spring Cloud Stream的使用方法。
整體概述

服務(wù)要想與其他服務(wù)通訊要定義通道,一般會(huì)定義輸出通道和輸入通道,輸出通道用于發(fā)送消息,輸入通道用于接收消息,每個(gè)通道都會(huì)有個(gè)名字(輸入和輸出只是通道類(lèi)型,可以用不同的名字定義很多很多通道),不同通道的名字不能相同否則會(huì)報(bào)錯(cuò)(輸入通道和輸出通道不同類(lèi)型的通道名稱(chēng)也不能相同),綁定器是操作RabbitMQ或Kafka的抽象層,為了屏蔽操作這些消息中間件的復(fù)雜性和不一致性,綁定器會(huì)用通道的名字在消息中間件中定義主題,一個(gè)主題內(nèi)的消息生產(chǎn)者來(lái)自多個(gè)服務(wù),一個(gè)主題內(nèi)消息的消費(fèi)者也是多個(gè)服務(wù),也就是說(shuō)消息的發(fā)布和消費(fèi)是通過(guò)主題進(jìn)行定義和組織的,通道的名字就是主題的名字,在RabbitMQ中主題使用Exchanges實(shí)現(xiàn),在Kafka中主題使用Topic實(shí)現(xiàn)。
準(zhǔn)備環(huán)境
創(chuàng)建兩個(gè)項(xiàng)目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我們用Spring Cloud Stream實(shí)現(xiàn)通訊,spring-cloud-stream-b我們用Spring Cloud Stream的底層模塊Spring Integration實(shí)現(xiàn)通訊。
兩個(gè)項(xiàng)目的POM文件依賴(lài)都是:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring-cloud-stream-binder-rabbit是指綁定器的實(shí)現(xiàn)使用RabbitMQ。
項(xiàng)目配置內(nèi)容application.properties:
spring.application.name=spring-cloud-stream-a server.port=9010 #設(shè)置默認(rèn)綁定器 spring.cloud.stream.defaultBinder = rabbit spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b server.port=9011 #設(shè)置默認(rèn)綁定器 spring.cloud.stream.defaultBinder = rabbit spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
啟動(dòng)一個(gè)rabbitmq:
docker pull rabbitmq:3-management docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
編寫(xiě)A項(xiàng)目代碼
在A項(xiàng)目中定義一個(gè)輸入通道一個(gè)輸出通道,定義通道在接口中使用@Input和@Output注解定義,程序啟動(dòng)的時(shí)候Spring Cloud Stream會(huì)根據(jù)接口定義將實(shí)現(xiàn)類(lèi)自動(dòng)注入(Spring Cloud Stream自動(dòng)實(shí)現(xiàn)該接口不需要寫(xiě)代碼)。
A服務(wù)輸入通道,通道名稱(chēng)ChatExchanges.A.Input,接口定義輸入通道必須返回SubscribableChannel:
public interface ChatInput {
String INPUT = "ChatExchanges.A.Input";
@Input(ChatInput.INPUT)
SubscribableChannel input();
}
A服務(wù)輸出通道,通道名稱(chēng)ChatExchanges.A.Output,輸出通道必須返回MessageChannel:
public interface ChatOutput {
String OUTPUT = "ChatExchanges.A.Output";
@Output(ChatOutput.OUTPUT)
MessageChannel output();
}
定義消息實(shí)體類(lèi):
public class ChatMessage implements Serializable {
private String name;
private String message;
private Date chatDate;
//沒(méi)有無(wú)參數(shù)的構(gòu)造函數(shù)并行化會(huì)出錯(cuò)
private ChatMessage(){}
public ChatMessage(String name,String message,Date chatDate){
this.name = name;
this.message = message;
this.chatDate = chatDate;
}
public String getName(){
return this.name;
}
public String getMessage(){
return this.message;
}
public Date getChatDate() { return this.chatDate; }
public String ShowMessage(){
return String.format("聊天消息:%s的時(shí)候,%s說(shuō)%s。",this.chatDate,this.name,this.message);
}
}
在業(yè)務(wù)處理類(lèi)上用@EnableBinding注解綁定輸入通道和輸出通道,這個(gè)綁定動(dòng)作其實(shí)就是創(chuàng)建并注冊(cè)輸入和輸出通道的實(shí)現(xiàn)類(lèi)到Bean中,所以可以直接是使用@Autowired進(jìn)行注入使用,另外消息的串行化默認(rèn)使用application/json格式(com.fastexml.jackson),最后用@StreamListener注解進(jìn)行指定通道消息的監(jiān)聽(tīng):
//ChatInput.class的輸入通道不在這里綁定,監(jiān)聽(tīng)到數(shù)據(jù)會(huì)找不到AClient類(lèi)的引用。
//Input和Output通道定義的名字不能一樣,否則程序啟動(dòng)會(huì)拋異常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {
private static Logger logger = LoggerFactory.getLogger(AClient.class);
@Autowired
private ChatOutput chatOutput;
//StreamListener自帶了Json轉(zhuǎn)對(duì)象的能力,收到B的消息打印并回復(fù)B一個(gè)新的消息。
@StreamListener(ChatInput.INPUT)
public void PrintInput(ChatMessage message) {
logger.info(message.ShowMessage());
ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());
chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
}
}
到此A項(xiàng)目代碼編寫(xiě)完成。
編寫(xiě)B(tài)項(xiàng)目代碼
B項(xiàng)目使用Spring Integration實(shí)現(xiàn)消息的發(fā)布和消費(fèi),定義通道時(shí)我們要交換輸入通道和輸出通道的名稱(chēng):
public interface ChatProcessor {
String OUTPUT = "ChatExchanges.A.Input";
String INPUT = "ChatExchanges.A.Output";
@Input(ChatProcessor.INPUT)
SubscribableChannel input();
@Output(ChatProcessor.OUTPUT)
MessageChannel output();
}
消息實(shí)體類(lèi):
public class ChatMessage {
private String name;
private String message;
private Date chatDate;
//沒(méi)有無(wú)參數(shù)的構(gòu)造函數(shù)并行化會(huì)出錯(cuò)
private ChatMessage(){}
public ChatMessage(String name,String message,Date chatDate){
this.name = name;
this.message = message;
this.chatDate = chatDate;
}
public String getName(){
return this.name;
}
public String getMessage(){
return this.message;
}
public Date getChatDate() { return this.chatDate; }
public String ShowMessage(){
return String.format("聊天消息:%s的時(shí)候,%s說(shuō)%s。",this.chatDate,this.name,this.message);
}
}
業(yè)務(wù)處理類(lèi)用@ServiceActivator注解代替@StreamListener,用@InboundChannelAdapter注解發(fā)布消息:
@EnableBinding(ChatProcessor.class)
public class BClient {
private static Logger logger = LoggerFactory.getLogger(BClient.class);
//@ServiceActivator沒(méi)有Json轉(zhuǎn)對(duì)象的能力需要借助@Transformer注解
@ServiceActivator(inputChannel=ChatProcessor.INPUT)
public void PrintInput(ChatMessage message) {
logger.info(message.ShowMessage());
}
@Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
public ChatMessage transform(String message) throws Exception{
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(message,ChatMessage.class);
}
//每秒發(fā)出一個(gè)消息給A
@Bean
@InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
public GenericMessage<ChatMessage> SendChatMessage(){
ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
return gm;
}
}
運(yùn)行程序
啟動(dòng)A項(xiàng)目和B項(xiàng)目:


源碼
Github倉(cāng)庫(kù):https://github.com/sunweisheng/spring-cloud-example
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java 運(yùn)算符 動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java 運(yùn)算符 動(dòng)力節(jié)點(diǎn)Java學(xué)院整理,需要的朋友可以參考下2017-04-04
MybatisPlus的MetaObjectHandler與@TableLogic使用
這篇文章主要介紹了MybatisPlus的MetaObjectHandler與@TableLogic使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-04-04
詳解Spring Boot 項(xiàng)目啟動(dòng)時(shí)執(zhí)行特定方法
這篇文章主要介紹了詳解Spring Boot 項(xiàng)目啟動(dòng)時(shí)執(zhí)行特定方法,Springboot給我們提供了兩種“開(kāi)機(jī)啟動(dòng)”某些方法的方式:ApplicationRunner和CommandLineRunner。感興趣的小伙伴們可以參考一下2018-06-06
SpringBoot獲取配置文件內(nèi)容的幾種方式總結(jié)
大家都知道SpringBoot獲取配置文件的方法有很多,下面這篇文章主要給大家介紹了關(guān)于SpringBoot獲取配置文件內(nèi)容的幾種方式,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02
SpringCloud微服務(wù)開(kāi)發(fā)基于RocketMQ實(shí)現(xiàn)分布式事務(wù)管理詳解
分布式事務(wù)是在微服務(wù)開(kāi)發(fā)中經(jīng)常會(huì)遇到的一個(gè)問(wèn)題,之前的文章中我們已經(jīng)實(shí)現(xiàn)了利用Seata來(lái)實(shí)現(xiàn)強(qiáng)一致性事務(wù),其實(shí)還有一種廣為人知的方案就是利用消息隊(duì)列來(lái)實(shí)現(xiàn)分布式事務(wù),保證數(shù)據(jù)的最終一致性,也就是我們常說(shuō)的柔性事務(wù)2022-09-09
java?jar包后臺(tái)運(yùn)行的兩種方式詳解
后臺(tái)運(yùn)行jar的方法有多種方法可以實(shí)現(xiàn)Java后臺(tái)運(yùn)行jar文件,下面介紹其中兩種常見(jiàn)的方法,下面這篇文章主要給大家介紹了關(guān)于java?jar包后臺(tái)運(yùn)行的兩種方式,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-07-07
java String[]字符串?dāng)?shù)組自動(dòng)排序的簡(jiǎn)單實(shí)現(xiàn)
下面小編就為大家?guī)?lái)一篇java String[]字符串?dāng)?shù)組自動(dòng)排序的簡(jiǎn)單實(shí)現(xiàn)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-09-09

