欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring Cloud Stream簡單用法

 更新時間:2021年07月23日 14:50:36   作者:微瞰技術  
Spring cloud stream是為構建微服務消息驅動而產生的一種框架。Spring Cloud Stream基于Spring boot的基礎上,可創(chuàng)建獨立的、生產級別的Spring應用,并采用Spring Integration來連接消息中間件提供消息事件驅動,一起看看吧

Spring Cloud Stream對Spring Cloud體系中的Mq進⾏了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節(jié)差異,就像Hibernate屏蔽掉了具體數(shù)據(jù)庫(Mysql/Oracle⼀樣)。如此⼀來,我們學習、開發(fā)、維護MQ都會變得輕松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在這個基礎上提供了RocketMQ的支持

簡單使用Spring Cloud Stream 構建基于RocketMQ的生產者和消費者

生產者

pom文件中加入依賴

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>

    </dependencies>

配置文件中增加關于Spring Cloud Stream binder和bindings的配置

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 內容格式。這里使用 JSON

其中destination代表生產的數(shù)據(jù)發(fā)送到的topic 然后定義一個channel用于數(shù)據(jù)發(fā)送

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface TestChannel {
    @Output("output")
    MessageChannel output();
}

最后構造數(shù)據(jù)發(fā)送的接口

@Controller
public class SendMessageController {
    @Resource
    private TestChannel testChannel;

    @ResponseBody
    @RequestMapping(value = "send", method = RequestMethod.GET)
    public String sendMessage() {
        String messageId = UUID.randomUUID().toString();
        Message<String> message = MessageBuilder
                .withPayload("this is a test:" + messageId)
                .setHeader(MessageConst.PROPERTY_TAGS, "test")
                .build();
        try {
            testChannel.output().send(message);
            return messageId + "發(fā)送成功";
        } catch (Exception e) {
            return messageId + "發(fā)送失敗,原因:" + e.getMessage();
        }
    }
}

消費者

消費者的pom引入與生產者相同,在此不再贅述,配置時需要將stream的output修改為input并修改對應屬性

spring:
  application:
    name: zhao-cloud-stream-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          input:
            consumer:
              tags: test
      bindings:
        input:
          destination: stream-test-topic
          content-type: text/plain # 內容格式。這里使用 JSON
          group: test

另外關于channel的構造也要做同樣的修改

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface TestChannel {
    @Input("input")
    SubscribableChannel input();
}

最后我在啟動類中對收到的消息進行了監(jiān)聽

   @StreamListener("input")
    public void receiveInput(@Payload Message message) throws ValidationException {
        System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
    }

測試結果

file

file

Stream其他特性

消息發(fā)送失敗的處理

消息發(fā)送失敗后悔發(fā)送到默認的一個“topic.errors"的channel中(topic是配置的destination)。要配置消息發(fā)送失敗的處理,需要將錯誤消息的channel打開 消費者配置如下

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 內容格式。這里使用 JSON
          producer:
            errorChannelEnabled: true

在啟動類中配置錯誤消息的Channel信息

 @Bean("stream-test-topic.errors")
    MessageChannel testoutPutErrorChannel(){
        return new PublishSubscribeChannel();
    }

新建異常處理service

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service
public class ErrorProducerService {

    @ServiceActivator(inputChannel = "stream-test-topic.errors")
    public void receiveProducerError(Message message){
        System.out.println("receive error msg :"+message);
    }
}

當發(fā)生異常時,由于測試類中已經將異常捕獲,處理發(fā)送異常主要是在這里進行。模擬,應用與rocketMq斷開的場景。可見

file file

消費者錯誤處理

首先增加配置為

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 內容格式。這里使用 JSON
          producer:
            errorChannelEnabled: true

增加相應的模擬異常的操作

 @StreamListener("input")
    public void receiveInput(@Payload Message message) throws ValidationException {
        //System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
        throw new RuntimeException("oops");
    }
    @ServiceActivator(inputChannel = "stream-test-topic.test.errors")
    public void receiveConsumeError(Message message){
        System.out.println("receive error msg"+message.getPayload());
    }

file

代碼地址https://github.com/zhendiao/deme-code/tree/main/zp

到此這篇關于Spring Cloud Stream簡單用法的文章就介紹到這了,更多相關Spring Cloud Stream使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 鄰接表無向圖的Java語言實現(xiàn)完整源碼

    鄰接表無向圖的Java語言實現(xiàn)完整源碼

    這篇文章主要介紹了鄰接表無向圖的Java語言實現(xiàn)完整源碼,具有一定借鑒價值,需要的朋友可以參考下。
    2017-12-12
  • SpringBoot3解決跨域請求的方案小結

    SpringBoot3解決跨域請求的方案小結

    解決跨域請求,主要有JSONP,iframe,window.name,CORS等方式,其中CORS方式是最常用的跨域實現(xiàn)方式,而且是對各種請求方法、各種數(shù)據(jù)請求類型都是完美支持的,本文介紹了SpringBoot3解決跨域請求的方案小結,需要的朋友可以參考下
    2024-07-07
  • Java實現(xiàn)文件的加密解密功能示例

    Java實現(xiàn)文件的加密解密功能示例

    這篇文章主要介紹了Java實現(xiàn)文件的加密解密功能,結合具體實例形式詳細分析了java針對文件的讀取、判斷、加密、解密等相關步驟與操作技巧,需要的朋友可以參考下
    2017-10-10
  • 一文深入理解Java中的深拷貝機制

    一文深入理解Java中的深拷貝機制

    在Java編程中,我們經常需要處理對象的復制問題,深拷貝和淺拷貝是兩種常見的復制方式,它們在內存管理和對象引用方面存在不同特點,本文將帶大家深入探究Java中的深拷貝機制,需要的朋友可以參考下
    2023-09-09
  • Java swing 圖像處理多種效果實現(xiàn)教程

    Java swing 圖像處理多種效果實現(xiàn)教程

    這篇文章主要介紹了Java swing 圖像處理多種效果實現(xiàn)教程,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • Java使用DateUtils對日期進行數(shù)學運算經典應用示例【附DateUtils相關包文件下載】

    Java使用DateUtils對日期進行數(shù)學運算經典應用示例【附DateUtils相關包文件下載】

    這篇文章主要介紹了Java使用DateUtils對日期進行數(shù)學運算的方法,可實現(xiàn)針對日期時間的各種常見運算功能,并附帶DateUtils的相關包文件供讀者下載使用,需要的朋友可以參考下
    2017-11-11
  • 教你利用SpringBoot寫一個屬于自己的Starter

    教你利用SpringBoot寫一個屬于自己的Starter

    如果我們將可獨立于業(yè)務代碼之外的功配置模塊封裝成一個個starter,復用的時候只需要將其在pom中引用依賴即可,SpringBoot為我們完成自動裝配,簡直不要太爽,這篇文章主要給大家介紹了關于如何利用SpringBoot寫一個屬于自己的Starter,需要的朋友可以參考下
    2022-03-03
  • 將springboot項目生成可依賴的jar并引入到項目中的方法

    將springboot項目生成可依賴的jar并引入到項目中的方法

    SpringBoot項目默認打包的是可運行jar包,也可以打包成不可運行的jar包,本文給大家介紹將springboot項目生成可依賴的jar并引入到項目中的方法,感興趣的朋友一起看看吧
    2023-11-11
  • java?-jar命令及SpringBoot通過java?-jav啟動項目的過程

    java?-jar命令及SpringBoot通過java?-jav啟動項目的過程

    本篇文章將為大家講述關于 SpringBoot 項目工程完成后,是如何通過 java-jar 命令來啟動的,以及介紹 java-jar 命令的詳細內容,對SpringBoot java?-jav啟動過程感興趣的朋友跟隨小編一起看看吧
    2023-05-05
  • springBoot server.port=-1的含義說明

    springBoot server.port=-1的含義說明

    這篇文章主要介紹了springBoot server.port=-1的含義說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08

最新評論