欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spring-cloud-stream的手動(dòng)消息確認(rèn)問題

 更新時(shí)間:2023年05月25日 14:38:27   作者:l1161558158  
這篇文章主要介紹了spring-cloud-stream的手動(dòng)消息確認(rèn)問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

spring-cloud-stream的手動(dòng)消息確認(rèn)

對于kafka-binder來說,設(shè)置autoCommitOffset為false.然后在listen中手動(dòng)確認(rèn)

@StreamListener(Sink.INPUT)
void listen(@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment){
? ? //...業(yè)務(wù)代碼
? ? acknowledgment.acknowledge();
}

需要注意的是autoCommitOffset的設(shè)置位置.

spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false#應(yīng)該在這里設(shè)置
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false#這里設(shè)置是無效的,獲取Acknowledgment時(shí)會(huì)是null

springcloud的stream消息組件的使用@StreamListener

常見問題(使用rabbitmq)

消息分組防止多實(shí)例重復(fù)消費(fèi)

在一個(gè)服務(wù)多實(shí)例場景下使用默認(rèn)使用@StreamListener監(jiān)聽消息消費(fèi),yml中沒有特殊配置的話是會(huì)導(dǎo)致消息重復(fù)消費(fèi)的,原因是此時(shí)每個(gè)實(shí)例都是匿名在rabbitmq上注冊的隊(duì)列,需要給消費(fèi)者指定一個(gè)消費(fèi)組,讓消息在組里只被消費(fèi)一次;

spring.cloud.stream.bindings.xxx(消費(fèi)者隊(duì)列名).group=xxx(組名)

在springboot下在同一個(gè)服務(wù)(項(xiàng)目中)使用@input和@outPut時(shí)指定的隊(duì)列名是不可以重復(fù)的.會(huì)在啟動(dòng)編譯的時(shí)候報(bào)bean定義重復(fù)。需要在yml給生產(chǎn)者和消費(fèi)者指定同一個(gè)交換機(jī)。

spring:
  rabbitmq:
    host: xxx.xxx.xxx.xx
    port: 35672
    username: xxx
    password: xxx
    virtual-host: /xxx
  cloud:
    stream:
      bindings:
        in:
          #若消息系統(tǒng)是RabbitMQ,目的地(destination)就是指exchange,消息系統(tǒng)是Kafka,那么就是指topic
          destination: test
          #在多實(shí)例的時(shí)候需要制定一個(gè)消息分組,不然每個(gè)實(shí)例都是匿名方式把隊(duì)列注冊到rabbitmq上去,導(dǎo)致一個(gè)交換機(jī)下有多個(gè)隊(duì)列
          #并且默認(rèn)生成的交換機(jī)是topic類型的,會(huì)導(dǎo)致重復(fù)消費(fèi)
          group: myIn
        out:
          destination: test

先上依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.fchan</groupId>
    <artifactId>springcloudstream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springcloudstream</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<!--            <version>2.0.1.RELEASE</version>-->
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Ditmars.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

再上yml配置

spring:
  rabbitmq:
    host: xxx.xxx.xxx.xx
    port: 35672
    username: xxx
    password: xxx
    virtual-host: /xxx
  cloud:
    stream:
      bindings:
        in:
          #若消息系統(tǒng)是RabbitMQ,目的地(destination)就是指exchange,消息系統(tǒng)是Kafka,那么就是指topic
          destination: test
          #在多實(shí)例的時(shí)候需要制定一個(gè)消息分組,不然每個(gè)實(shí)例都是匿名方式把隊(duì)列注冊到rabbitmq上去,導(dǎo)致一個(gè)交換機(jī)下有多個(gè)隊(duì)列
          #并且默認(rèn)生成的交換機(jī)是topic類型的,會(huì)導(dǎo)致重復(fù)消費(fèi)
          group: myIn
        out:
          destination: test

消息生產(chǎn)者

package com.fchan.springcloudstream.service;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyMessageChannel {
    String out = "out";
    String in = "in";
    @Output(out)
    MessageChannel out();
    @Input(in)
    SubscribableChannel in();
}

發(fā)送消息

package com.fchan.springcloudstream.controller;
import com.fchan.springcloudstream.service.MyMessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@RestController
public class MessageController {
    @Resource
    private MyMessageChannel myMessageChannel;
    @RequestMapping("test")
    public String testMessage(){
        Map<String,Object> map = new HashMap<>();
        map.put("shopId", "123");
        myMessageChannel.out().send(MessageBuilder.withPayload(map).build());
        return "success";
    }
}

消息消費(fèi)者

package com.fchan.springcloudstream.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@EnableBinding({MyMessageChannel.class})
public class MyConsumer {
    Logger log = LoggerFactory.getLogger(MyConsumer.class);
    @StreamListener(MyMessageChannel.in)
    public void input(Message<Map<String,Object>> message){
        log.info("收到消息:{}", message.getPayload());
    }
}

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論