欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot如何配置多kafka

 更新時間:2024年01月30日 16:44:57   作者:是菜菜的小嚴(yán)惜哎  
這篇文章主要介紹了springboot如何配置多kafka問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

springboot配置多kafka

kafka,說起這個玩意,大家應(yīng)該都不陌生,不知道啥是kafka的直接去搜索

就像MySQL的使用一樣,我們在用kafka的時候,也會碰到需要使用多個kafka的情況,比如我從kafkaA的一個topic里消費(fèi)出數(shù)據(jù),進(jìn)行一次處理,然后我要寫入到kafkaB的topic里

從網(wǎng)上找了很多配置多kafka的教程,但是都不大好使,后來還是找到了,加上我自己改了點(diǎn)點(diǎn)東西,貼出來和大家分享一下~

首先是pom文件,kafka的依賴

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件

spring:
  kafka:
    listener:
      concurrency: 10
    one:
      # kafka地址
      bootstrap-servers: 192.168.217.117:9092
      producer:
        # 生產(chǎn)者每次發(fā)送消息的時間間隔(毫秒)
        linger-ms: 5000
        # 單條消息最大值(字節(jié))
        max-request-size: 16384
        # 每次批量發(fā)送消息的數(shù)量
        batch-size: 16384
        # 緩存區(qū)大小
        buffer-memory: 33554432
        # 隊(duì)列
        topic: test
      consumer:
        # 是否自動提交
        enable-auto-commit: true
        # 隊(duì)列
        topic: topic
        # group ID
        group-id: consumer
    two:
      # kafka地址
      bootstrap-servers: 192.168.217.160:9092
      producer:
        # 生產(chǎn)者每次發(fā)送消息的時間間隔(毫秒)
        linger-ms: 100
        # 單條消息最大值(單位為字節(jié),且大小指的是經(jīng)過序列化后的大?。?
        max-request-size: 16384
        # 每次批量發(fā)送消息的數(shù)量
        batch-size: 16384
        # 緩存區(qū)大小
        buffer-memory: 33554432
        # 隊(duì)列
        topic: test
      consumer:
        # 是否自動提交
        enable-auto-commit: true
        # 隊(duì)列
        topic: test
        # group ID
        group-id: consumer

有了配置文件當(dāng)然就要讀取配置文件了

先讀取第一個kafka

@EnableKafka
@Configuration
public class KafkaConfigOne {
 
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.producer.linger-ms}")
    private Integer lingerMs;
    @Value("${spring.kafka.one.producer.max-request-size}")
    private Integer maxRequestSize;
    @Value("${spring.kafka.one.producer.batch-size}")
    private Integer batchSize;
    @Value("${spring.kafka.one.producer.buffer-memory}")
    private Integer bufferMemory;
 
 
    @Bean
    public KafkaTemplate<String, String> kafkaOneTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
 
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
 
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
 
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
 
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}
 

接著讀取第二個kafka

@EnableKafka
@Configuration
public class KafkaConfigTwo {
 
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.producer.linger-ms}")
    private Integer lingerMs;
    @Value("${spring.kafka.two.producer.max-request-size}")
    private Integer maxRequestSize;
    @Value("${spring.kafka.two.producer.batch-size}")
    private Integer batchSize;
    @Value("${spring.kafka.two.producer.buffer-memory}")
    private Integer bufferMemory;
 
 
    @Bean
    public KafkaTemplate<String, String> kafkaTwoTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
 
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
 
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
 
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
 
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}
 

PS:路徑什么的大家可以改哈

使用:

     @Autowired
    @Qualifier("kafkaTwoTemplate")
    private KafkaTemplate kafkaTwoTemplate;
 
kafkaTwoTemplate.send("topic", "message");

直接注入使用就可以,但是,在這兒注意,因?yàn)榕渲昧硕鄠€kafka,所以需要進(jìn)行區(qū)分,此處我使用@Autowired和@Qualifier連用,大家也可以使用@DataSource,這個就是多數(shù)據(jù)源的注解而已,無所謂,按照個人習(xí)慣進(jìn)行使用就OK

當(dāng)然,還有消費(fèi)者

 @KafkaListener(topics = {"#{'${spring.kafka.two.consumer.topic}'}"}, containerFactory = "kafkaTwoContainerFactory")
    public void listenerTwo (String data) {
        System.out.println(data);
    }

總結(jié)

以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 使用JSON.toJSONString格式化成json字符串時保留null屬性

    使用JSON.toJSONString格式化成json字符串時保留null屬性

    這篇文章主要介紹了使用JSON.toJSONString格式化成json字符串時保留null屬性,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • Java獲取七牛云存儲空間中圖片外鏈

    Java獲取七牛云存儲空間中圖片外鏈

    本文主要介紹了Java獲取七牛云存儲空間中圖片外鏈,需要獲取在七牛云中存儲的所有圖片,并返回外鏈地址,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-10-10
  • Java多線程之Semaphore實(shí)現(xiàn)信號燈

    Java多線程之Semaphore實(shí)現(xiàn)信號燈

    這篇文章主要給大家分享的是Java多線程之Semaphore實(shí)現(xiàn)信號燈的練習(xí),emaphore是計(jì)數(shù)信號量。Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然后拿走一個許可證;下面一起進(jìn)入文章學(xué)習(xí)Semaphore的具體內(nèi)容
    2021-10-10
  • java?NIO實(shí)現(xiàn)簡單聊天程序

    java?NIO實(shí)現(xiàn)簡單聊天程序

    這篇文章主要為大家詳細(xì)介紹了java?NIO實(shí)現(xiàn)簡單聊天程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • RxJava中多種場景的實(shí)現(xiàn)總結(jié)

    RxJava中多種場景的實(shí)現(xiàn)總結(jié)

    這篇文章給大家詳細(xì)介紹了RxJava中多種場景的實(shí)現(xiàn),對大家學(xué)習(xí)使用RxJava具有一定的參考借鑒價(jià)值,有需要的朋友們可以參考學(xué)習(xí),下面來一起看看吧。
    2016-10-10
  • Java BigDecimal案例詳解

    Java BigDecimal案例詳解

    這篇文章主要介紹了Java BigDecimal案例詳解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • spring boot實(shí)戰(zhàn)之使用JSP的示例

    spring boot實(shí)戰(zhàn)之使用JSP的示例

    本篇文章主要介紹了spring boot實(shí)戰(zhàn)之使用JSP的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-10-10
  • 理解Java當(dāng)中的回調(diào)機(jī)制(翻譯)

    理解Java當(dāng)中的回調(diào)機(jī)制(翻譯)

    今天我要和大家分享一些東西,舉例來說這個在JavaScript中用的很多。我要講講回調(diào)(callbacks)。你知道什么時候用,怎么用這個嗎?你真的理解了它在java環(huán)境中的用法了嗎?當(dāng)我也問我自己這些問題,這也是我開始研究這些的原因
    2014-10-10
  • 詳細(xì)全面解析Java泛型

    詳細(xì)全面解析Java泛型

    這篇文章主要介紹了詳細(xì)全面解析Java泛型,java泛型主要提高了Java 程序的類型安全,通過知道使用泛型定義的變量的類型限制,編譯器可以驗(yàn)證類型假設(shè),消除源代碼中的許多強(qiáng)制類型轉(zhuǎn)換等多個有點(diǎn),下面我們進(jìn)入文章了解更多的詳細(xì)內(nèi)容吧
    2022-02-02
  • Spring?Security權(quán)限想要細(xì)化到按鈕實(shí)現(xiàn)示例

    Spring?Security權(quán)限想要細(xì)化到按鈕實(shí)現(xiàn)示例

    這篇文章主要為大家介紹了Spring?Security權(quán)限想要細(xì)化到按鈕實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-07-07

最新評論