欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot使用@KafkaListener監(jiān)聽多個kafka配置實現(xiàn)

 更新時間:2024年04月09日 09:36:45   作者:道不平  
當(dāng)服務(wù)中需要監(jiān)聽多個kafka時,?需要配置多個kafka,本文主要介紹了springboot使用@KafkaListener監(jiān)聽多個kafka配置實現(xiàn),具有一定的參考價值,感興趣的可以了解一下

背景

使用springboot整合kafka時, springboot默認(rèn)讀取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener時指定topic即可, 當(dāng)服務(wù)中需要監(jiān)聽多個kafka時, 需要配置多個kafka, 這種方式不適用

方案

可以手動讀取不同kafka配置信息, 創(chuàng)建不同的Kafka 監(jiān)聽容器工廠, 使用@KafkaListener時指定相應(yīng)的容器工廠, 代碼如下:

1. 導(dǎo)入依賴

        <dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

2. yml配置

kafka:
  # 默認(rèn)消費者配置
  default-consumer:
    # 自動提交已消費offset
    enable-auto-commit: true
    # 自動提交間隔時間
    auto-commit-interval: 1000
    # 消費的超時時間
    poll-timeout: 1500
    # 如果Kafka中沒有初始偏移量,或者服務(wù)器上不再存在當(dāng)前偏移量(例如,因為該數(shù)據(jù)已被刪除)自動將該偏移量重置成最新偏移量
    auto.offset.reset: latest
    # 消費會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作)
    session.timeout.ms: 120000
    # 消費請求超時時間
    request.timeout.ms: 180000
  # 1號kafka配置
  test1:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx
  # 2號kafka配置
  test2:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx

3. 容器工廠配置

package com.zhdx.modules.backstage.config;

import com.google.common.collect.Maps;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.Map;

/**
 * kafka監(jiān)聽容器工廠配置
 * <p>
 * 拓展其他消費者配置只需配置指定的屬性和bean即可
 */
@EnableKafka
@Configuration
@RefreshScope
public class KafkaListenerContainerFactoryConfig {

    /**
     *  test1 kafka配置
     */
    @Value("${kafka.test1.bootstrap-servers}")
    private String test1KafkaServerUrls;

    @Value("${kafka.test1.consumer.group-id}")
    private String test1GroupId;

    @Value("${kafka.test1.consumer.sasl.mechanism}")
    private String test1SaslMechanism;

    @Value("${kafka.test1.consumer.security.protocol}")
    private String test1SecurityProtocol;

    @Value("${kafka.test1.consumer.sasl.jaas.config}")
    private String test1SaslJaasConfig;
    /**
     *  test2 kafka配置
     */
    @Value("${kafka.test2.bootstrap-servers}")
    private String test2KafkaServerUrls;

    @Value("${kafka.test2.consumer.group-id}")
    private String test2GroupId;

    @Value("${kafka.test2.consumer.sasl.mechanism}")
    private String test2SaslMechanism;

    @Value("${kafka.test2.consumer.security.protocol}")
    private String test2SecurityProtocol;

    @Value("${kafka.test2.consumer.sasl.jaas.config}")
    private String test2SaslJaasConfig;

    /**
     * 默認(rèn)消費者配置
     */
    @Value("${kafka.default-consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${kafka.default-consumer.poll-timeout}")
    private int pollTimeout;

    @Value("${kafka.default-consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${kafka.default-consumer.session.timeout.ms}")
    private int sessionTimeoutMs;

    @Value("${kafka.default-consumer.request.timeout.ms}")
    private int requestTimeoutMs;

    /**
     * test1消費者配置
     */
    public Map<String, Object> test1ConsumerConfigs() {
        Map<String, Object> props = getDefaultConsumerConfigs();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls);
        // 消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId);
        // 加密
        props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism);
        props.put("security.protocol", test1SecurityProtocol);
        // 賬號密碼
        props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig);
        return props;
    }
    
    /**
     * test2消費者配置
     */
    public Map<String, Object> test2ConsumerConfigs() {
        Map<String, Object> props = getDefaultConsumerConfigs();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls);
        // 消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId);
        // 加密
        props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism);
        props.put("security.protocol", test2SecurityProtocol);
        // 賬號密碼
        props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig);
        return props;
    }

    /**
     * 默認(rèn)消費者配置
     */
    private Map<String, Object> getDefaultConsumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        // 自動提交(按周期)已消費offset 批量消費下設(shè)置false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        // 消費會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作)
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        // 消費請求超時時間
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
        // 序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 如果Kafka中沒有初始偏移量,或者服務(wù)器上不再存在當(dāng)前偏移量(例如,因為該數(shù)據(jù)已被刪除)自動將該偏移量重置成最新偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }

    /**
     * 消費者工廠類
     */
    public ConsumerFactory<String, String> initConsumerFactory(Map<String, Object> consumerConfigs) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> initKafkaListenerContainerFactory(
        Map<String, Object> consumerConfigs) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(initConsumerFactory(consumerConfigs));
        // 是否開啟批量消費
        factory.setBatchListener(false);
        // 消費的超時時間
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        return factory;
    }

    /**
     * 創(chuàng)建test1 Kafka 監(jiān)聽容器工廠。
     *
     * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 對象
     */
    @Bean(name = "test1KafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test1KafkaListenerContainerFactory() {
        Map<String, Object> consumerConfigs = this.test1ConsumerConfigs();
        return initKafkaListenerContainerFactory(consumerConfigs);
    }
    

    /**
     * 創(chuàng)建test2 Kafka 監(jiān)聽容器工廠。
     *
     * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 對象
     */
    @Bean(name = "test2KafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test2KafkaListenerContainerFactory() {
        Map<String, Object> consumerConfigs = this.test2ConsumerConfigs();
        return initKafkaListenerContainerFactory(consumerConfigs);
    }
}

4. @KafkaListener使用

package com.zhdx.modules.backstage.kafka;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka監(jiān)聽器
 */
@Slf4j
@Component
public class test1KafkaListener {
    @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx")
    public void handleHyPm(ConsumerRecord<String, String> record) {
        log.info("消費到topic xxx消息:{}", JSON.toJSONString(record.value()));
    }
}

到此這篇關(guān)于springboot使用@KafkaListener監(jiān)聽多個kafka配置實現(xiàn)的文章就介紹到這了,更多相關(guān)springboot 監(jiān)聽多個kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

  • JAVA Frame 窗體背景圖片,首位相接滾動代碼實例

    JAVA Frame 窗體背景圖片,首位相接滾動代碼實例

    這篇文章主要介紹了JAVA Frame 窗體背景圖片,首位相接滾動代碼示例,需要的朋友可以參考下復(fù)制代碼
    2017-04-04
  • Java數(shù)據(jù)庫連接池c3p0過程解析

    Java數(shù)據(jù)庫連接池c3p0過程解析

    這篇文章主要介紹了Java數(shù)據(jù)庫連接池c3p0過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-07-07
  • Java怎么獲取多網(wǎng)卡本地ip

    Java怎么獲取多網(wǎng)卡本地ip

    java獲取本地ip,獲取多網(wǎng)卡本地ip在項目中經(jīng)常會用到,下面小編把代碼分享到腳本之家平臺,供大家參考
    2016-03-03
  • Java 中的內(nèi)存映射 mmap

    Java 中的內(nèi)存映射 mmap

    這篇文章主要介紹了Java 中的內(nèi)存映射,mmap 是一種內(nèi)存映射文件的方法,即將一個文件映射到進(jìn)程的地址空間,實現(xiàn)文件磁盤地址和一段進(jìn)程虛擬地址的映射,下面來看看詳細(xì)內(nèi)容,需要的朋友可以參考一下
    2021-11-11
  • Springboot項目快速實現(xiàn)攔截器功能

    Springboot項目快速實現(xiàn)攔截器功能

    上一篇文章介紹了Springboot項目如何快速實現(xiàn)過濾器功能,本篇文章接著來盤一盤攔截器,仔細(xì)研究后會發(fā)現(xiàn),其實攔截器和過濾器的功能非常類似,可以理解為面向切面編程的一種具體實現(xiàn)。感興趣的小伙伴可以參考閱讀
    2023-03-03
  • Spring Aop 如何獲取參數(shù)名參數(shù)值

    Spring Aop 如何獲取參數(shù)名參數(shù)值

    這篇文章主要介紹了Spring Aop 如何獲取參數(shù)名參數(shù)值的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Java String字符串內(nèi)容實現(xiàn)添加雙引號

    Java String字符串內(nèi)容實現(xiàn)添加雙引號

    這篇文章主要介紹了Java String字符串內(nèi)容實現(xiàn)添加雙引號,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • 關(guān)于JpaRepository的關(guān)聯(lián)查詢和@Query查詢

    關(guān)于JpaRepository的關(guān)聯(lián)查詢和@Query查詢

    這篇文章主要介紹了JpaRepository的關(guān)聯(lián)查詢和@Query查詢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • MyBatis之關(guān)于動態(tài)SQL解讀

    MyBatis之關(guān)于動態(tài)SQL解讀

    這篇文章主要介紹了MyBatis之關(guān)于動態(tài)SQL解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-06-06
  • MyBatis執(zhí)行Sql的流程實例解析

    MyBatis執(zhí)行Sql的流程實例解析

    這篇文章主要介紹了MyBatis執(zhí)行Sql的流程實例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-12-12

最新評論