欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring Cloud Stream 高級特性使用詳解

 更新時間:2022年09月07日 14:30:40   作者:Java浮華  
這篇文章主要為大家介紹了Spring Cloud Stream 高級特性使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

重試

Consumer端可以配置重試次數(shù),當消息消費失敗的時候會進行重試。

底層使用Spring Retry去重試,重試次數(shù)可自定義配置。

# 默認重試次數(shù)為3,配置大于1時才會生效
spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3 

消息發(fā)送失敗的處理

Producer發(fā)送消息出錯的情況下,可以配置錯誤處理,將錯誤信息發(fā)送給對應ID的MessageChannel

  • 消息發(fā)送失敗的場景下,會將消息發(fā)送到一個MessageChannel。這個MessageChannel會取ApplicationContext中name為topic.errorstopic就是配置的destination)的Bean。
  • 如果找不到就會自動構建一個PublishSubscribeChannel。
  • 然后使用BridgeHandler訂閱這個MessageChannel,同時再設置ApplicationContext中name為errorChannelPublishSubscribeChannel消息通道為BridgeHandleroutputChannel。
    public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"
    private SubscribableChannel registerErrorInfrastructure(
            ProducerDestination destination) {
        // destination.getName() + ".errors"
        String errorChannelName = errorsBaseName(destination);
        SubscribableChannel errorChannel;
        if (getApplicationContext().containsBean(errorChannelName)) {
            Object errorChannelObject = getApplicationContext().getBean(errorChannelName);
            if (!(errorChannelObject instanceof SubscribableChannel)) {
                throw new IllegalStateException("Error channel '" + errorChannelName
                        + "' must be a SubscribableChannel");
            }
            errorChannel = (SubscribableChannel) errorChannelObject;
        }
        else {
            errorChannel = new PublishSubscribeChannel();
            ((GenericApplicationContext) getApplicationContext()).registerBean(
                    errorChannelName, SubscribableChannel.class, () -> errorChannel);
        }
        MessageChannel defaultErrorChannel = null;
        if (getApplicationContext()
                .containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
            defaultErrorChannel = getApplicationContext().getBean(
                    IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
                    MessageChannel.class);
        }
        if (defaultErrorChannel != null) {
            BridgeHandler errorBridge = new BridgeHandler();
            errorBridge.setOutputChannel(defaultErrorChannel);
            errorChannel.subscribe(errorBridge);
            String errorBridgeHandlerName = getErrorBridgeName(destination);
            ((GenericApplicationContext) getApplicationContext()).registerBean(
                    errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge);
        }
        return errorChannel;
    }
  • 示例代碼
spring.cloud.stream.bindings.output.destination=test-output
# 消息發(fā)送失敗的處理邏輯默認是關閉的
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
    @Bean("test-output.errors")
    MessageChannel testOutputErrorChannel() {
        return new PublishSubscribeChannel();
    }
    @Service
    class ErrorProduceService {
        @ServiceActivator(inputChannel = "test-output.errors")
        public void receiveProduceError(Message receiveMsg) {
            System.out.println("receive error msg: " + receiveMsg);
        }
    }

消費錯誤處理

Consumer消費消息出錯的情況下,可以配置錯誤處理,將錯誤信息發(fā)給對應ID的MessageChannel

消息錯誤處理與生產(chǎn)錯誤處理大致相同。錯誤的MessageChannel對應的name為topic.group.errors,還會加上多個MessageHandler訂閱的一些判斷,使用ErrorMessageStrategy創(chuàng)建錯誤消息等內容。

  • 示例代碼
spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT)
public void receive(String receiveMsg) {
    throw new RuntimeException("Oops");
}
@ServiceActivator(inputChannel = "test-input.test-input-group.errors")
public void receiveConsumeError(Message receiveMsg) {
    System.out.println("receive error msg: " + receiveMsg);
}

建議直接使用topic.group.errors這個消息通道,并設置發(fā)送到單播模式的DirectChannel消息通道中(使用@ServiceActivator注解接收會直接構成DirectChannel),這樣會確保只會被唯一的一個訂閱了topic.group.errorsMessageHandler處理,否則可能會被多個MessageHandler處理,導致出現(xiàn)一些意想不到的結果。

自定義MessageHandler類型

默認情況下,Output Binding對應的MessageChannelInput Binding對應的SubscribeChannel會被構造成DirectChannel

SCS提供了BindingTargetFactory接口進行擴展,比如可以擴展構造PublishSubscribeChannel這種廣播類型的MessageChannel

BindingTargetFactory接口只有兩個實現(xiàn)類

  • SubscribableChannelBindingTargetFactory:針對Input BindingOutput Binding都會構造成DirectWithAttributesChannel類型的MessageChannel(一種帶有HashMap屬性的DirectChannel)。
  • MessageSourceBindingTargetFactory:不支持Output BindingInput Binding會構造成DefaultPollableMessageSource。DefaultPollableMessageSource內部維護著MessageSource屬性,該屬性用于拉取消息。

Endpoint端點

SCS提供了BindingsEndpoint,可以獲取Binding信息或對Binding生命周期進行修改,比如start、stoppauseresume。

BindingsEndpoint的ID是bindings,對外暴露了一下3個操作:

  • 修改Binding狀態(tài),可以改成STARTEDSTOPPED、PAUSEDRESUMED,對應Binding接口的4個操作。
  • 查詢單個Binding的狀態(tài)信息。
  • 查詢所有Binding的狀態(tài)信息。
@Endpoint(id = "bindings")
public class BindingsEndpoint {
  ...
  @WriteOperation
    public void changeState(@Selector String name, State state) {
        Binding<?> binding = BindingsEndpoint.this.locateBinding(name);
        if (binding != null) {
            switch (state) {
            case STARTED:
                binding.start();
                break;
            case STOPPED:
                binding.stop();
                break;
            case PAUSED:
                binding.pause();
                break;
            case RESUMED:
                binding.resume();
                break;
            default:
                break;
            }
        }
    }
    @ReadOperation
    public List<?> queryStates() {
        List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings());
        bindings.addAll(gatherOutputBindings());
        return this.objectMapper.convertValue(bindings, List.class);
    }
    @ReadOperation
    public Binding<?> queryState(@Selector String name) {
        Assert.notNull(name, "'name' must not be null");
        return this.locateBinding(name);
    }
  ...
}

Metrics指標

該功能自動與micrometer集成進行Metrics統(tǒng)計,可以通過前綴spring.cloud.stream.metrics進行相關配置,配置項spring.cloud.stream.bindings.applicationMetrics.destination會構造MetersPublisherBinding,將相關的metrics發(fā)送到MQ中。

Serverless

默認與Spring Cloud Function集成。

可以使用Function處理消息。配置文件需要加上function配置。

spring.cloud.stream.function.definition=uppercase | addprefix

  @Bean
  public Function<String, String> uppercase() {
      return x -> x.toUpperCase();
  }
  @Bean
  public Function<String, String> addprefix() {
      return x -> "prefix-" + x;
  }

Partition統(tǒng)一

SCS統(tǒng)一Partition相關的設置,可以屏蔽不同MQ Partition的設置。

Producer Binding提供的ProducerProperties提供了一些Partition相關的配置:

  • partitionKeyExpression:partition key提取表達式。
  • partitionKeyExtractorName:是一個實現(xiàn)PartitionKeyExtractorStrategy接口的Bean name。PartitionKeyExtractorStrategy是一個根據(jù)Message獲取partition key的接口。如果兩者都配置,優(yōu)先級高于partitionKeyExtractorName
  • partitionSelectorName:是一個實現(xiàn)PartitionSelectorStrategy接口的Bean name。PartitionSelectorStrategy是一個根據(jù)partition key決定選擇哪個partition 的接口。
  • partitionSelectorExpression:partition 選擇表達式,會根據(jù)表達式和partition key得到最終的partition。如果兩者都配置,優(yōu)先partitionSelectorExpression表達式解析partition。
  • partitionCount:partition 個數(shù)。該屬性不一定會生效,Kafka Binder 和RocketMQ Binder會使用topic上的partition 個數(shù)覆蓋該屬性。
public final class PartitioningInterceptor implements ChannelInterceptor {
      ...
      @Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {
      if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) {
        int partition = this.partitionHandler.determinePartition(message);
        return MessageConverterConfigurer.this.messageBuilderFactory
          .fromMessage(message)
          .setHeader(BinderHeaders.PARTITION_HEADER, partition).build();
      }
      else {
        return MessageConverterConfigurer.this.messageBuilderFactory
          .fromMessage(message)
          .setHeader(BinderHeaders.PARTITION_HEADER,
                     message.getHeaders()
                     .get(BinderHeaders.PARTITION_OVERRIDE))
          .removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
      }
    }
}
public class PartitionHandler {
      ...
      public int determinePartition(Message<?> message) {
        Object key = extractKey(message);
        int partition;
        if (this.producerProperties.getPartitionSelectorExpression() != null) {
            partition = this.producerProperties.getPartitionSelectorExpression()
                    .getValue(this.evaluationContext, key, Integer.class);
        }
        else {
            partition = this.partitionSelectorStrategy.selectPartition(key,
                    this.partitionCount);
        }
        // protection in case a user selector returns a negative.
        return Math.abs(partition % this.partitionCount);
    }
    private Object extractKey(Message<?> message) {
        Object key = invokeKeyExtractor(message);
        if (key == null && this.producerProperties.getPartitionKeyExpression() != null) {
            key = this.producerProperties.getPartitionKeyExpression()
                    .getValue(this.evaluationContext, message);
        }
        Assert.notNull(key, "Partition key cannot be null");
        return key;
    }
      ...
}

Polling Consumer

實現(xiàn)MessageSource進行polling操作的Consumer

普通的Pub/Sub模式需要定義SubscribeableChannel類型的返回值,Polling Consumer需要定義PollableMessageSource類型的返回值。

public interface PollableSink {
    /**
     * Input channel name.
     */
    String INPUT = "input";
    /**
     * @return input channel.
     */
    @Input(Sink.INPUT)
    PollableMessageSource input();
}

支持多個Binder同時使用

支持多個Binder同時使用,在配置Binding的時候需要指定對應的Binder。

配置全局默認的Binderspring.cloud.stream.default-binder=rocketmq。

配置各個Binder內部的配置信息:

spring.cloud.stream.binders.rocketmq.environment.<xx>=xx

spring.cloud.stream.binders.rocketmq.type=rocketmq

配置Binding對應的Binder

spring.cloud.stream.bindings.<channelName>.binder=kafka

spring.cloud.stream.bindings.<channelName>.binder=rocketmq

spring.cloud.stream.bindings.<channelName>.binder=rabbit

建立事件機制

比如,新建BindingCreateEvent事件,用戶的應用就可以監(jiān)聽該事件在創(chuàng)建Input BindingOutput Binding 時做業(yè)務相關的處理。

以上就是Spring Cloud Stream 高級特性使用詳解的詳細內容,更多關于Spring Cloud Stream 高級特性的資料請關注腳本之家其它相關文章!

相關文章

  • springboot集成swagger、knife4j及常用注解的使用

    springboot集成swagger、knife4j及常用注解的使用

    這篇文章主要介紹了springboot集成swagger、knife4j及常用注解的使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • java聯(lián)調生成測試數(shù)據(jù)工具類方式

    java聯(lián)調生成測試數(shù)據(jù)工具類方式

    這篇文章主要介紹了java聯(lián)調生成測試數(shù)據(jù)工具類方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 普通對象使用spring容器中的對象的實現(xiàn)方法

    普通對象使用spring容器中的對象的實現(xiàn)方法

    這篇文章主要介紹了普通對象使用spring容器中的對象的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-06-06
  • 詳解簡單基于spring的redis配置(單機和集群模式)

    詳解簡單基于spring的redis配置(單機和集群模式)

    這篇文章主要介紹了詳解簡單基于spring的redis配置(單機和集群模式),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-02-02
  • Java實現(xiàn)數(shù)組翻轉的實現(xiàn)代碼

    Java實現(xiàn)數(shù)組翻轉的實現(xiàn)代碼

    這篇文章主要介紹了Java實現(xiàn)數(shù)組翻轉的實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-09-09
  • Java整合Jackson實現(xiàn)反序列化器流程

    Java整合Jackson實現(xiàn)反序列化器流程

    Jackson是一個開源的Java序列化和反序列化工具,可以將Java對象序列化為XML或JSON格式的字符串,以及將XML或JSON格式的字符串反序列化為Java對象。由于其使用簡單,速度較快,且不依靠除JDK外的其他庫,被眾多用戶所使用
    2023-01-01
  • 通過使用Byte?Buddy便捷創(chuàng)建Java?Agent

    通過使用Byte?Buddy便捷創(chuàng)建Java?Agent

    這篇文章主要為大家介紹了如何通過使用Byte?Buddy便捷創(chuàng)建Java?Agent的使用說明,有需要的朋友可以借鑒參考下希望能夠有所幫助,祝大家多多進步
    2022-03-03
  • Java基礎篇之分布式版本控制工具Git

    Java基礎篇之分布式版本控制工具Git

    Git是一個開源的分布式版本控制系統(tǒng),可以有效、高速地處理從很小到非常大的項目版本管理。 也是Linus Torvalds為了幫助管理Linux內核開發(fā)而開發(fā)的一個開放源碼的版本控制軟件
    2021-10-10
  • Eclipse創(chuàng)建JavaWeb工程的完整步驟記錄

    Eclipse創(chuàng)建JavaWeb工程的完整步驟記錄

    很多新手不知道Eclipse怎么創(chuàng)建Java Web項目,一起來看看吧,這篇文章主要給大家介紹了關于Eclipse創(chuàng)建JavaWeb工程的完整步驟,需要的朋友可以參考下
    2023-10-10
  • MyBatis-Plus自定義SQL和復雜查詢的實現(xiàn)

    MyBatis-Plus自定義SQL和復雜查詢的實現(xiàn)

    MyBatis-Plus增強了MyBatis的功能,提供注解和XML兩種自定義SQL方式,支持復雜查詢如多表關聯(lián)、動態(tài)分頁等,通過注解如@Select、@Insert、@Update、@Delete實現(xiàn)CRUD操作,本文就來介紹一下,感興趣的可以了解一下
    2024-10-10

最新評論