SpringCloudStream原理和深入使用小結
簡單概述
Spring Cloud Stream是一個用于構建與共享消息傳遞系統(tǒng)連接的高度可擴展的事件驅動型微服務的框架。
應用程序通過inputs或outputs來與Spring Cloud Stream中binder對象交互,binder對象負責與消息中間件交互。也就是說:Spring Cloud Stream能夠屏蔽底層消息中間件【RabbitMQ,kafka等】的差異,降低切換成本,統(tǒng)一消息的編程模型。
相關概念
Channel(通道):Channel是消息的傳輸管道,用于在生產者和消費者之間傳遞消息。生產者通過輸出通道將消息發(fā)送到Destination,消費者通過輸入通道從Destination接收消息。
在Spring Cloud Stream中,有兩種類型的通道:輸入(input)和輸出(output)。這兩種通道分別用于消費者接收消息和生產者發(fā)送消息。
- Input(輸入):Input通道用于消費者從消息代理接收消息。消費者可以通過監(jiān)聽Input通道來實時接收傳入的消息
- Output(輸出):Output通道用于生產者向消息代理發(fā)送消息。生產者可以通過向Output通道發(fā)送消息來發(fā)布新的消息
Destination(目標):Destination是消息的目的地,通常對應于消息代理中的Topic或Queue。生產者將消息發(fā)送到特定的Destination,消費者從其中接收消息。
Binder(綁定器):Binder是Spring Cloud Stream的核心組件之一。它作為消息代理與外部消息中間件進行交互,并負責將消息發(fā)送到消息總線或從消息總線接收消息。Binder負責處理消息傳遞、序列化、反序列化、消息路由等底層細節(jié),使得開發(fā)者能夠以統(tǒng)一的方式與不同的消息中間件進行交互。Spring Cloud Stream提供了多個可用的Binder實現,包括RabbitMQ、Kafka等。
**消費者組:**在Spring Cloud Stream中,消費組(Consumer Group)是一組具有相同功能的消費者實例。當多個消費者實例屬于同一個消費組時,消息代理會將消息均勻地分發(fā)給消費者實例,以實現負載均衡。如果其中一個消費者實例失效,消息代理會自動將消息重新分配給其他可用的消費者實例,以實現高可用性。(對于一個消息來說,每個消費者組只會有一個消費者消費消息)
分區(qū):Spring Cloud Stream支持在多個消費者實例之間創(chuàng)建分區(qū),這樣我們通過某些特征量做消息分發(fā),保證相同標識的消息總是能被同一個消費者處理
Spring Message
Spring Message是Spring Framework的一個模塊,其作用就是統(tǒng)一消息的編程模型。
package org.springframework.messaging; public interface Message<T> { T getPayload(); MessageHeaders getHeaders(); }
消息通道 MessageChannel 用于接收消息,調用send方法可以將消息發(fā)送至該消息通道中:
@FunctionalInterface public interface MessageChannel { long INDEFINITE_TIMEOUT = -1; default boolean send(Message<?> message) { return send(message, INDEFINITE_TIMEOUT); } boolean send(Message<?> message, long timeout); }
消息通道里的消息由消息通道的子接口可訂閱的消息通道SubscribableChannel
實現,被MessageHandler
消息處理器所訂閱
public interface SubscribableChannel extends MessageChannel { boolean subscribe(MessageHandler handler); boolean unsubscribe(MessageHandler handler); }
由MessageHandler
真正地消費/處理消息
@FunctionalInterface public interface MessageHandler { void handleMessage(Message<?> message) throws MessagingException; }
Spring Integration
Spring Integration 提供了 Spring 編程模型的擴展用來支持企業(yè)集成模式(Enterprise Integration Patterns),是對 Spring Messaging 的擴展。
它提出了不少新的概念,包括消息路由MessageRoute、消息分發(fā)MessageDispatcher、消息過濾Filter、消息轉換Transformer、消息聚合Aggregator、消息分割Splitter等等。同時還提供了MessageChannel和MessageHandler的實現,分別包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等內容。
Spring-Cloud-Stream的架構
快速入門
引入依賴
<!--stream--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
增加配置文件
spring: cloud: stream: # 定義消息中間件 binders: MyRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: root password: root vhost: / bindings: # 生產者中定義,定義發(fā)布對象 myInput: destination: myStreamExchange group: myStreamGroup binder: MyRabbit # 消費者中定義,定義訂閱的對象 myOutput-in-0: destination: myStreamExchange group: myStreamGroup binder: MyRabbit # 消費者中定義,定義輸出的函數 function: definition: myOutput
生產者
@Resource private StreamBridge streamBridge; public void sendNormal() { streamBridge.send("myInput", "hello world"); }
消費者
@Bean("myOutput") public Consumer<Message<String>> myOutput() { return (message) -> { MessageHeaders headers = message.getHeaders(); System.out.println("myOutput head is : " + headers); String payload = message.getPayload(); System.out.println("myOutput payload is : " + payload); }; }
如何自定義Binder
- 添加spring-cloud-stream依賴
- 提供
ProvisioningProvider
的實現提供 MessageProducer
的實現提供MessageHandler
的實現提供Binder
的實現創(chuàng)建Binder的配置- 在
META-INF/spring.binders
中定義綁定器
添加spring-cloud-stream依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>${spring.cloud.stream.version}</version> </dependency>
提供ProvisioningProvider的實現
ProvisioningProvider
負責提供消費者和生產者目的地,并需要將 application.yml 或 application.properties 文件中包含的邏輯目的地轉換為物理目的地引用。
public class FileProvisioningProvider implements ProvisioningProvider< ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>> { public FileProvisioningProvider() { super(); } @Override public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<FileProducerProperties> properties) throws ProvisioningException { return new FileMessageDestination(name); } @Override public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws ProvisioningException { return new FileMessageDestination(name); } private static class FileMessageDestination implements ProducerDestination, ConsumerDestination { private final String destination; private FileMessageDestination(final String destination) { this.destination = destination; } @Override public String getName() { return destination.trim(); } @Override public String getNameForPartition(int partition) { throw new UnsupportedOperationException("Partitioning is not implemented for file messaging."); } } }
提供MessageProducer的實現
MessageProducer負責使用事件并將其作為消息處理,發(fā)送給配置為使用此類事件的客戶端應用程序。
super.onInit(); executorService = Executors.newScheduledThreadPool(1); } @Override public void doStart() { executorService.scheduleWithFixedDelay(() -> { String payload = getPayload(); if (payload != null) { Message<String> receivedMessage = MessageBuilder.withPayload(payload).build(); sendMessage(receivedMessage); } }, 0, 50, TimeUnit.MILLISECONDS); } @Override protected void doStop() { executorService.shutdownNow(); } private String getPayload() { try { List<String> allLines = Files.readAllLines(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt")); String currentPayload = allLines.get(allLines.size() - 1); if (!currentPayload.equals(previousPayload)) { previousPayload = currentPayload; return currentPayload; } } catch (IOException e) { FileUtil.touch(new File(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt")); } return null; } }
提供MessageHandler的實現
MessageHandler
提供產生事件所需的邏輯。
public class FileMessageHandler extends AbstractMessageHandler { FileExtendedBindingProperties fileExtendedBindingProperties; ProducerDestination destination; public FileMessageHandler(ProducerDestination destination, FileExtendedBindingProperties fileExtendedBindingProperties) { this.destination = destination; this.fileExtendedBindingProperties = fileExtendedBindingProperties; } @Override protected void handleMessageInternal(Message<?> message) { try { if (message.getPayload() instanceof byte[]) { Files.write(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"), (byte[]) message.getPayload()); } else { throw new RuntimeException("處理消息失敗"); } } catch (IOException e) { throw new RuntimeException(e); } } }
提供Binder的實現
提供自己的Binder
抽象實現:
- 擴展
AbstractMessageChannelBinder
類 - 將自定義的 ProvisioningProvider 指定為 AbstractMessageChannelBinder 的通用參數
- 重寫
createProducerMessageHandler
和createConsumerEndpoint
方法
public class FileMessageChannelBinder extends AbstractMessageChannelBinder <ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>, FileProvisioningProvider> implements ExtendedPropertiesBinder<MessageChannel, FileConsumerProperties, FileProducerProperties> { FileExtendedBindingProperties fileExtendedBindingProperties; public FileMessageChannelBinder(String[] headersToEmbed, FileProvisioningProvider provisioningProvider, FileExtendedBindingProperties fileExtendedBindingProperties) { super(headersToEmbed, provisioningProvider); this.fileExtendedBindingProperties = fileExtendedBindingProperties; } @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<FileProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception { FileMessageHandler fileMessageHandler = new FileMessageHandler(destination, fileExtendedBindingProperties); return fileMessageHandler; } @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws Exception { FileMessageProducerAdapter fileMessageProducerAdapter = new FileMessageProducerAdapter(destination, fileExtendedBindingProperties); return fileMessageProducerAdapter; } @Override public FileConsumerProperties getExtendedConsumerProperties(String channelName) { return fileExtendedBindingProperties.getExtendedConsumerProperties(channelName); } @Override public FileProducerProperties getExtendedProducerProperties(String channelName) { return fileExtendedBindingProperties.getExtendedProducerProperties(channelName); } @Override public String getDefaultsPrefix() { return fileExtendedBindingProperties.getDefaultsPrefix(); } @Override public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() { return fileExtendedBindingProperties.getExtendedPropertiesEntryClass(); } }
創(chuàng)建Binder的配置
嚴格要求創(chuàng)建一個 Spring 配置來初始化你的綁定器實現的 bean
@EnableConfigurationProperties(FileExtendedBindingProperties.class) @Configuration public class FileMessageBinderConfiguration { @Bean @ConditionalOnMissingBean public FileProvisioningProvider fileMessageBinderProvisioner() { return new FileProvisioningProvider(); } @Bean @ConditionalOnMissingBean public FileMessageChannelBinder fileMessageBinder(FileProvisioningProvider fileMessageBinderProvisioner, FileExtendedBindingProperties fileExtendedBindingProperties) { return new FileMessageChannelBinder(null, fileMessageBinderProvisioner, fileExtendedBindingProperties); } @Bean public FileProducerProperties fileConsumerProperties() { return new FileProducerProperties(); } }
詳細的代碼見https://gitee.com/xiaovcloud/spring-cloud-stream
到此這篇關于SpringCloudStream原理和深入使用的文章就介紹到這了,更多相關SpringCloudStream原理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring @Retryable注解輕松搞定循環(huán)重試功能
spring系列的spring-retry是另一個實用程序模塊,可以幫助我們以標準方式處理任何特定操作的重試。在spring-retry中,所有配置都是基于簡單注釋的。本文主要介紹了Spring@Retryable注解如何輕松搞定循環(huán)重試功能,有需要的朋友可以參考一下2023-04-04SpringBoot項目application.yml文件數據庫配置密碼加密的方法
這篇文章主要介紹了SpringBoot項目application.yml文件數據庫配置密碼加密的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-03-03Spring Security基于散列加密方案實現自動登錄功能
為了提高項目的用戶體驗,我們可以在項目中添加自動登錄功能,當然也要給用戶提供退出登錄的功能。接下來學習下Spring Security基于散列加密方案實現自動登錄功能,一起看看吧2021-09-09Java中ArrayList在foreach里remove的問題詳析
這篇文章主要給大家介紹了關于Java中ArrayList在foreach里remove問題的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起看看吧2018-09-09