欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot集成Kafka進(jìn)行批量消費(fèi)及踩坑點(diǎn)

 更新時(shí)間:2021年12月14日 11:44:53   作者:秀強(qiáng)  
本文主要介紹了Springboot集成Kafka進(jìn)行批量消費(fèi)及踩坑點(diǎn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

引入依賴

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.3.11.RELEASE</version>
            </dependency>

因?yàn)槲业捻?xiàng)目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。讀者可以根據(jù)下圖來(lái)自行選擇對(duì)應(yīng)的版本。圖片更新可能不及時(shí),詳情可查看spring-kafka 官方網(wǎng)站。

在這里插入圖片描述

注:這里有個(gè)踩坑點(diǎn),如果引入包版本不對(duì),項(xiàng)目啟動(dòng)時(shí)會(huì)拋出org.springframework.core.log.LogAccessor 異常:

java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor

創(chuàng)建配置類

    /**
     * kafka 配置類
     */
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {

        private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);

        @Value("${kafka.bootstrap.servers}")
        private String kafkaBootstrapServers;

        @Value("${kafka.group.id}")
        private String kafkaGroupId;

        @Value("${kafka.topic}")
        private String kafkaTopic;

        public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf";

        public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks";


        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            // 設(shè)置并發(fā)量,小于或者等于 Topic 的分區(qū)數(shù)
            factory.setConcurrency(5);
            // 設(shè)置為批量監(jiān)聽(tīng)
            factory.setBatchListener(Boolean.TRUE);
            factory.getContainerProperties().setPollTimeout(30000);
            return factory;
        }

        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            //設(shè)置接入點(diǎn),請(qǐng)通過(guò)控制臺(tái)獲取對(duì)應(yīng)Topic的接入點(diǎn)。
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
            //設(shè)置SSL根證書(shū)的路徑,請(qǐng)記得將XXX修改為自己的路徑。
            //與SASL路徑類似,該文件也不能被打包到j(luò)ar中。
            System.setProperty("java.security.auth.login.config", CONFIG_PATH);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH);

            //根證書(shū)存儲(chǔ)的密碼,保持不變。
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            //接入?yún)f(xié)議,目前支持使用SASL_SSL協(xié)議接入。
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            //SASL鑒權(quán)方式,保持不變。
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            // 自動(dòng)提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
            //兩次Poll之間的最大允許間隔。
            //消費(fèi)者超過(guò)該值沒(méi)有返回心跳,服務(wù)端判斷消費(fèi)者處于非存活狀態(tài),服務(wù)端將消費(fèi)者從Consumer Group移除并觸發(fā)Rebalance,默認(rèn)30s。
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
            //設(shè)置單次拉取的量,走公網(wǎng)訪問(wèn)時(shí),該參數(shù)會(huì)有較大影響。
            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
            props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
            //每次Poll的最大數(shù)量。
            //注意該值不要改得太大,如果Poll太多數(shù)據(jù),而不能在下次Poll之前消費(fèi)完,則會(huì)觸發(fā)一次負(fù)載均衡,產(chǎn)生卡頓。
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
            //消息的反序列化方式。
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            //當(dāng)前消費(fèi)實(shí)例所屬的消費(fèi)組,請(qǐng)?jiān)诳刂婆_(tái)申請(qǐng)之后填寫。
            //屬于同一個(gè)組的消費(fèi)實(shí)例,會(huì)負(fù)載消費(fèi)消息。
            props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
            //Hostname校驗(yàn)改成空。
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
            return props;
        }
    }

注:此處通過(guò) factory.setConcurrency(5); 配置了并發(fā)量為 5 ,假設(shè)我們線上的 Topic 有 12 個(gè)分區(qū)。那么將會(huì)是 3 個(gè)線程分配到 2 個(gè)分區(qū),2 個(gè)線程分配到 3 個(gè)分區(qū),3 * 2 + 2 * 3 = 12。

Kafka 消費(fèi)者

    /**
     * kafka 消息消費(fèi)類
     */
    @Component
    public class KafkaMessageListener {

        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);

        @KafkaListener(topics = {"${kafka.topic}"})
        public void listen(List<ConsumerRecord<String, String>> recordList) {
            for (ConsumerRecord<String,String> record : recordList) {
                // 打印消息的分區(qū)以及偏移量
                LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
                String value = record.value();
                System.out.println("value = " + value);
                // 處理業(yè)務(wù)邏輯 ...
            }
        }
    }

因?yàn)槲以谂渲妙愔性O(shè)置了批量監(jiān)聽(tīng),所以此處 listen 方法的入?yún)⑹荓ist:List<ConsumerRecord<String, String>>。

到此這篇關(guān)于Springboot集成Kafka進(jìn)行批量消費(fèi)及踩坑點(diǎn)的文章就介紹到這了,更多相關(guān)Springboot Kafka批量消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring監(jiān)聽(tīng)器及定時(shí)任務(wù)實(shí)現(xiàn)方法詳解

    Spring監(jiān)聽(tīng)器及定時(shí)任務(wù)實(shí)現(xiàn)方法詳解

    這篇文章主要介紹了Spring監(jiān)聽(tīng)器及定時(shí)任務(wù)實(shí)現(xiàn)方法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • Java程序中使用JavaMail發(fā)送帶圖片和附件的郵件

    Java程序中使用JavaMail發(fā)送帶圖片和附件的郵件

    這篇文章主要介紹了Java程序中使用JavaMail發(fā)送帶圖片和附件的郵件,JavaMail是專門用來(lái)處理郵件的Java API,需要的朋友可以參考下
    2015-11-11
  • Java實(shí)現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的示例

    Java實(shí)現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的示例

    這篇文章主要介紹了Java實(shí)現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的示例,幫助大家方便的進(jìn)行文件格式轉(zhuǎn)換,完成需求,感興趣的朋友可以了解下
    2020-11-11
  • SpringBoot如何通過(guò)webjars管理靜態(tài)資源文件夾

    SpringBoot如何通過(guò)webjars管理靜態(tài)資源文件夾

    這篇文章主要介紹了SpringBoot如何通過(guò)webjars管理靜態(tài)資源文件夾,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-10-10
  • Java數(shù)據(jù)結(jié)構(gòu)之稀疏矩陣定義與用法示例

    Java數(shù)據(jù)結(jié)構(gòu)之稀疏矩陣定義與用法示例

    這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)之稀疏矩陣定義與用法,結(jié)合實(shí)例形式分析了java稀疏矩陣的定義、運(yùn)算、轉(zhuǎn)換等相關(guān)操作技巧,需要的朋友可以參考下
    2018-01-01
  • hibernate 三種狀態(tài)的轉(zhuǎn)換

    hibernate 三種狀態(tài)的轉(zhuǎn)換

    本文主要介紹了hibernate三種狀態(tài)的轉(zhuǎn)換。具有很好的參考價(jià)值。下面跟著小編一起來(lái)看下吧
    2017-03-03
  • 詳解spring security安全防護(hù)

    詳解spring security安全防護(hù)

    這篇文章主要介紹了詳解spring security安全防護(hù),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-07-07
  • Java過(guò)濾器模式原理及用法實(shí)例

    Java過(guò)濾器模式原理及用法實(shí)例

    這篇文章主要介紹了Java過(guò)濾器模式原理及用法實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05
  • java同步開(kāi)篇入門簡(jiǎn)單介紹

    java同步開(kāi)篇入門簡(jiǎn)單介紹

    java中的CountDownLatch、Semaphore、CyclicBarrier這些類又不屬于鎖,它們和鎖又有很多共同點(diǎn),都是為了協(xié)同多線程的執(zhí)行,都是一種同步器,所以這里就借用同步來(lái)取名字了,也就是“同步系列”的來(lái)源。下面小編來(lái)簡(jiǎn)單介紹下
    2019-05-05
  • Intellij IDEA插件開(kāi)發(fā)入門詳解

    Intellij IDEA插件開(kāi)發(fā)入門詳解

    這篇文章主要介紹了Intellij IDEA插件開(kāi)發(fā)入門詳解,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-02-02

最新評(píng)論