Spring Cloud Stream 高級特性使用詳解
重試
Consumer
端可以配置重試次數(shù),當消息消費失敗的時候會進行重試。
底層使用Spring Retry
去重試,重試次數(shù)可自定義配置。
# 默認重試次數(shù)為3,配置大于1時才會生效 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3
消息發(fā)送失敗的處理
Producer
發(fā)送消息出錯的情況下,可以配置錯誤處理,將錯誤信息發(fā)送給對應ID的MessageChannel
- 消息發(fā)送失敗的場景下,會將消息發(fā)送到一個
MessageChannel
。這個MessageChannel
會取ApplicationContext
中name為topic.errors
(topic
就是配置的destination
)的Bean。 - 如果找不到就會自動構建一個
PublishSubscribeChannel
。 - 然后使用
BridgeHandler
訂閱這個MessageChannel
,同時再設置ApplicationContext
中name為errorChannel
的PublishSubscribeChannel
消息通道為BridgeHandler
的outputChannel
。
public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel" private SubscribableChannel registerErrorInfrastructure( ProducerDestination destination) { // destination.getName() + ".errors" String errorChannelName = errorsBaseName(destination); SubscribableChannel errorChannel; if (getApplicationContext().containsBean(errorChannelName)) { Object errorChannelObject = getApplicationContext().getBean(errorChannelName); if (!(errorChannelObject instanceof SubscribableChannel)) { throw new IllegalStateException("Error channel '" + errorChannelName + "' must be a SubscribableChannel"); } errorChannel = (SubscribableChannel) errorChannelObject; } else { errorChannel = new PublishSubscribeChannel(); ((GenericApplicationContext) getApplicationContext()).registerBean( errorChannelName, SubscribableChannel.class, () -> errorChannel); } MessageChannel defaultErrorChannel = null; if (getApplicationContext() .containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) { defaultErrorChannel = getApplicationContext().getBean( IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, MessageChannel.class); } if (defaultErrorChannel != null) { BridgeHandler errorBridge = new BridgeHandler(); errorBridge.setOutputChannel(defaultErrorChannel); errorChannel.subscribe(errorBridge); String errorBridgeHandlerName = getErrorBridgeName(destination); ((GenericApplicationContext) getApplicationContext()).registerBean( errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge); } return errorChannel; }
- 示例代碼
spring.cloud.stream.bindings.output.destination=test-output # 消息發(fā)送失敗的處理邏輯默認是關閉的 spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
@Bean("test-output.errors") MessageChannel testOutputErrorChannel() { return new PublishSubscribeChannel(); } @Service class ErrorProduceService { @ServiceActivator(inputChannel = "test-output.errors") public void receiveProduceError(Message receiveMsg) { System.out.println("receive error msg: " + receiveMsg); } }
消費錯誤處理
Consumer
消費消息出錯的情況下,可以配置錯誤處理,將錯誤信息發(fā)給對應ID的MessageChannel
消息錯誤處理與生產(chǎn)錯誤處理大致相同。錯誤的MessageChannel
對應的name為topic.group.errors
,還會加上多個MessageHandler
訂閱的一些判斷,使用ErrorMessageStrategy
創(chuàng)建錯誤消息等內容。
- 示例代碼
spring.cloud.stream.bindings.input.destination=test-input spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT) public void receive(String receiveMsg) { throw new RuntimeException("Oops"); } @ServiceActivator(inputChannel = "test-input.test-input-group.errors") public void receiveConsumeError(Message receiveMsg) { System.out.println("receive error msg: " + receiveMsg); }
建議直接使用topic.group.errors
這個消息通道,并設置發(fā)送到單播模式的DirectChannel
消息通道中(使用@ServiceActivator
注解接收會直接構成DirectChannel
),這樣會確保只會被唯一的一個訂閱了topic.group.errors
的MessageHandler
處理,否則可能會被多個MessageHandler
處理,導致出現(xiàn)一些意想不到的結果。
自定義MessageHandler類型
默認情況下,Output Binding
對應的MessageChannel
和Input Binding
對應的SubscribeChannel
會被構造成DirectChannel
。
SCS提供了BindingTargetFactory
接口進行擴展,比如可以擴展構造PublishSubscribeChannel
這種廣播類型的MessageChannel
。
BindingTargetFactory
接口只有兩個實現(xiàn)類
SubscribableChannelBindingTargetFactory
:針對Input Binding
和Output Binding
都會構造成DirectWithAttributesChannel
類型的MessageChannel
(一種帶有HashMap
屬性的DirectChannel
)。MessageSourceBindingTargetFactory
:不支持Output Binding
,Input Binding
會構造成DefaultPollableMessageSource
。DefaultPollableMessageSource
內部維護著MessageSource
屬性,該屬性用于拉取消息。
Endpoint端點
SCS提供了BindingsEndpoint
,可以獲取Binding
信息或對Binding
生命周期進行修改,比如start
、stop
、pause
或resume
。
BindingsEndpoint
的ID是bindings,對外暴露了一下3個操作:
- 修改
Binding
狀態(tài),可以改成STARTED
、STOPPED
、PAUSED
和RESUMED
,對應Binding
接口的4個操作。 - 查詢單個
Binding
的狀態(tài)信息。 - 查詢所有
Binding
的狀態(tài)信息。
@Endpoint(id = "bindings") public class BindingsEndpoint { ... @WriteOperation public void changeState(@Selector String name, State state) { Binding<?> binding = BindingsEndpoint.this.locateBinding(name); if (binding != null) { switch (state) { case STARTED: binding.start(); break; case STOPPED: binding.stop(); break; case PAUSED: binding.pause(); break; case RESUMED: binding.resume(); break; default: break; } } } @ReadOperation public List<?> queryStates() { List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings()); bindings.addAll(gatherOutputBindings()); return this.objectMapper.convertValue(bindings, List.class); } @ReadOperation public Binding<?> queryState(@Selector String name) { Assert.notNull(name, "'name' must not be null"); return this.locateBinding(name); } ... }
Metrics指標
該功能自動與micrometer集成進行Metrics統(tǒng)計,可以通過前綴spring.cloud.stream.metrics
進行相關配置,配置項spring.cloud.stream.bindings.applicationMetrics.destination
會構造MetersPublisherBinding
,將相關的metrics發(fā)送到MQ中。
Serverless
默認與Spring Cloud Function
集成。
可以使用Function處理消息。配置文件需要加上function配置。
spring.cloud.stream.function.definition=uppercase | addprefix
@Bean public Function<String, String> uppercase() { return x -> x.toUpperCase(); } @Bean public Function<String, String> addprefix() { return x -> "prefix-" + x; }
Partition統(tǒng)一
SCS統(tǒng)一Partition
相關的設置,可以屏蔽不同MQ Partition的設置。
Producer Binding提供的ProducerProperties提供了一些Partition相關的配置:
partitionKeyExpression
:partition key提取表達式。partitionKeyExtractorName
:是一個實現(xiàn)PartitionKeyExtractorStrategy
接口的Bean name。PartitionKeyExtractorStrategy
是一個根據(jù)Message獲取partition key的接口。如果兩者都配置,優(yōu)先級高于partitionKeyExtractorName
。partitionSelectorName
:是一個實現(xiàn)PartitionSelectorStrategy
接口的Bean name。PartitionSelectorStrategy
是一個根據(jù)partition key決定選擇哪個partition 的接口。partitionSelectorExpression
:partition 選擇表達式,會根據(jù)表達式和partition key得到最終的partition。如果兩者都配置,優(yōu)先partitionSelectorExpression
表達式解析partition。partitionCount
:partition 個數(shù)。該屬性不一定會生效,Kafka Binder 和RocketMQ Binder會使用topic上的partition 個數(shù)覆蓋該屬性。
public final class PartitioningInterceptor implements ChannelInterceptor { ... @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) { int partition = this.partitionHandler.determinePartition(message); return MessageConverterConfigurer.this.messageBuilderFactory .fromMessage(message) .setHeader(BinderHeaders.PARTITION_HEADER, partition).build(); } else { return MessageConverterConfigurer.this.messageBuilderFactory .fromMessage(message) .setHeader(BinderHeaders.PARTITION_HEADER, message.getHeaders() .get(BinderHeaders.PARTITION_OVERRIDE)) .removeHeader(BinderHeaders.PARTITION_OVERRIDE).build(); } } } public class PartitionHandler { ... public int determinePartition(Message<?> message) { Object key = extractKey(message); int partition; if (this.producerProperties.getPartitionSelectorExpression() != null) { partition = this.producerProperties.getPartitionSelectorExpression() .getValue(this.evaluationContext, key, Integer.class); } else { partition = this.partitionSelectorStrategy.selectPartition(key, this.partitionCount); } // protection in case a user selector returns a negative. return Math.abs(partition % this.partitionCount); } private Object extractKey(Message<?> message) { Object key = invokeKeyExtractor(message); if (key == null && this.producerProperties.getPartitionKeyExpression() != null) { key = this.producerProperties.getPartitionKeyExpression() .getValue(this.evaluationContext, message); } Assert.notNull(key, "Partition key cannot be null"); return key; } ... }
Polling Consumer
實現(xiàn)MessageSource
進行polling
操作的Consumer
。
普通的Pub/Sub模式需要定義SubscribeableChannel
類型的返回值,Polling Consumer需要定義PollableMessageSource
類型的返回值。
public interface PollableSink { /** * Input channel name. */ String INPUT = "input"; /** * @return input channel. */ @Input(Sink.INPUT) PollableMessageSource input(); }
支持多個Binder同時使用
支持多個Binder
同時使用,在配置Binding
的時候需要指定對應的Binder
。
配置全局默認的Binder
:spring.cloud.stream.default-binder=rocketmq
。
配置各個Binder內部的配置信息:
spring.cloud.stream.binders.rocketmq.environment.<xx>=xx
spring.cloud.stream.binders.rocketmq.type=rocketmq
配置Binding
對應的Binder
:
spring.cloud.stream.bindings.<channelName>.binder=kafka
spring.cloud.stream.bindings.<channelName>.binder=rocketmq
spring.cloud.stream.bindings.<channelName>.binder=rabbit
建立事件機制
比如,新建BindingCreateEvent
事件,用戶的應用就可以監(jiān)聽該事件在創(chuàng)建Input Binding
或Output Binding
時做業(yè)務相關的處理。
以上就是Spring Cloud Stream 高級特性使用詳解的詳細內容,更多關于Spring Cloud Stream 高級特性的資料請關注腳本之家其它相關文章!
相關文章
springboot集成swagger、knife4j及常用注解的使用
這篇文章主要介紹了springboot集成swagger、knife4j及常用注解的使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07java聯(lián)調生成測試數(shù)據(jù)工具類方式
這篇文章主要介紹了java聯(lián)調生成測試數(shù)據(jù)工具類方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03Java實現(xiàn)數(shù)組翻轉的實現(xiàn)代碼
這篇文章主要介紹了Java實現(xiàn)數(shù)組翻轉的實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-09-09通過使用Byte?Buddy便捷創(chuàng)建Java?Agent
這篇文章主要為大家介紹了如何通過使用Byte?Buddy便捷創(chuàng)建Java?Agent的使用說明,有需要的朋友可以借鑒參考下希望能夠有所幫助,祝大家多多進步2022-03-03Eclipse創(chuàng)建JavaWeb工程的完整步驟記錄
很多新手不知道Eclipse怎么創(chuàng)建Java Web項目,一起來看看吧,這篇文章主要給大家介紹了關于Eclipse創(chuàng)建JavaWeb工程的完整步驟,需要的朋友可以參考下2023-10-10MyBatis-Plus自定義SQL和復雜查詢的實現(xiàn)
MyBatis-Plus增強了MyBatis的功能,提供注解和XML兩種自定義SQL方式,支持復雜查詢如多表關聯(lián)、動態(tài)分頁等,通過注解如@Select、@Insert、@Update、@Delete實現(xiàn)CRUD操作,本文就來介紹一下,感興趣的可以了解一下2024-10-10