欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot如何正確配置并運(yùn)行Kafka

 更新時間:2023年07月19日 09:55:32   作者:學(xué)弟不想努力了  
這篇文章主要介紹了SpringBoot如何正確配置并運(yùn)行Kafka問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

一、配置pom.xml,引入maven依賴

<!-- 引入kafka依賴 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.6</version>
</dependency>

二、application.yml配置文件

這里只提供了kafka有用的相關(guān)配置,其他的配置刪了

spring:
  kafka:
    bootstrap-servers: xx.xx.xx.xx:9092 # kafka集群信息,多個用逗號間隔
    # 生產(chǎn)者
    producer:
      # 重試次數(shù),設(shè)置大于0的值,則客戶端會將發(fā)送失敗的記錄重新發(fā)送
      retries: 3
      batch-size: 16384 #批量處理大小,16K
      buffer-memory: 33554432 #緩沖存儲大,32M
      acks: 1
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消費(fèi)者
    consumer:
      # 消費(fèi)者組
      group-id: TestGroup
      # 是否自動提交
      enable-auto-commit: false
      # 消費(fèi)偏移配置
      # none:如果沒有為消費(fèi)者找到先前的offset的值,即沒有自動維護(hù)偏移量,也沒有手動維護(hù)偏移量,則拋出異常
      # earliest:在各分區(qū)下有提交的offset時:從offset處開始消費(fèi);在各分區(qū)下無提交的offset時:從頭開始消費(fèi)
      # latest:在各分區(qū)下有提交的offset時:從offset處開始消費(fèi);在各分區(qū)下無提交的offset時:從最新的數(shù)據(jù)開始消費(fèi)
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 監(jiān)聽
    listener:
      # record:當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
      # batch:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后提交
      # time:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后,距離上次提交時間大于TIME時提交
      # count:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后,被處理record數(shù)量大于等于COUNT時提交
      # count_time:TIME或COUNT中有一個條件滿足時提交
      # manual:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后, 手動調(diào)用Acknowledgment.acknowledge()后提交
      # manual_immediate:手動調(diào)用Acknowledgment.acknowledge()后立即提交,一般推薦使用這種
      ack-mode: manual_immediate

三、消費(fèi)者

消費(fèi)者監(jiān)聽,可以配置多個監(jiān)聽器

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
 * 消費(fèi)者
 * kafka監(jiān)聽器
 */
@Component
public class KafkaConsumer {
    /**
     * kafka的監(jiān)聽器1,topic為"topic_test",消費(fèi)者組為"group_topic_test"
     * @param record
     * @param item
     */
    @KafkaListener(topics = "topic_test", groupId = "group_topic_test")
    public void topicListener1(ConsumerRecord<String, String> record, Acknowledgment item) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手動提交
        item.acknowledge();
    }
    /**
     * 配置多個消費(fèi)組
     * kafka的監(jiān)聽器2,topic為"topic_test2",消費(fèi)者組為"group_topic_test"
     * @param record
     * @param item
     */
    @KafkaListener(topics = "topic_test2",groupId = "group_topic_test2")
    public void topicListener2(ConsumerRecord<String, String> record, Acknowledgment item) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        item.acknowledge();
    }
}

四、生產(chǎn)者

生產(chǎn)者作為接口Api作為測試

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * kafka生產(chǎn)者
 */
@RestController
@RequestMapping("/kafka")
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @RequestMapping("/send")
    public void send() {
        kafkaTemplate.send("topic_test",  "key", "測試kafka消息");
    }
}

五、調(diào)用測試

啟動Boot項目,使用Postman工具發(fā)送GET請求:

http://localhost:8080/kafka/send

 

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • El表達(dá)式使用問題javax.el.ELException:Failed to parse the expression的解決方式

    El表達(dá)式使用問題javax.el.ELException:Failed to parse the expression

    今天小編就為大家分享一篇關(guān)于Jsp El表達(dá)式使用問題javax.el.ELException:Failed to parse the expression的解決方式,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-12-12
  • Spring學(xué)習(xí)筆記之bean生命周期

    Spring學(xué)習(xí)筆記之bean生命周期

    Spring Bean是Spring應(yīng)用中最最重要的部分了。下面這篇文章主要給大家介紹了關(guān)于Spring學(xué)習(xí)筆記之bean生命周期的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-12-12
  • java不通過配置文件初始化logger示例

    java不通過配置文件初始化logger示例

    這篇文章主要介紹了java不通過配置文件初始化logger示例,需要的朋友可以參考下
    2014-05-05
  • Java熱門筆試試題整理

    Java熱門筆試試題整理

    給大家整理了現(xiàn)在很熱門的java程序員面試時候的筆試試題以及答案,希望能幫助到你。
    2017-11-11
  • JDK動態(tài)代理之WeakCache緩存的實現(xiàn)機(jī)制

    JDK動態(tài)代理之WeakCache緩存的實現(xiàn)機(jī)制

    這篇文章主要介紹了JDK動態(tài)代理之WeakCache緩存的實現(xiàn)機(jī)制
    2018-02-02
  • Java Socket實現(xiàn)簡易聊天室

    Java Socket實現(xiàn)簡易聊天室

    這篇文章主要為大家詳細(xì)介紹了Java Socket實現(xiàn)簡易聊天室,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-03-03
  • SpringBoot中間件ORM框架實現(xiàn)案例詳解(Mybatis)

    SpringBoot中間件ORM框架實現(xiàn)案例詳解(Mybatis)

    這篇文章主要介紹了SpringBoot中間件ORM框架實現(xiàn)案例詳解(Mybatis),本篇文章提煉出mybatis最經(jīng)典、最精簡、最核心的代碼設(shè)計,來實現(xiàn)一個mini-mybatis,從而熟悉并掌握ORM框架的涉及實現(xiàn),需要的朋友可以參考下
    2023-07-07
  • 關(guān)于IDEA配置文件字符集的問題

    關(guān)于IDEA配置文件字符集的問題

    這篇文章主要介紹了關(guān)于IDEA配置文件字符集的問題,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-12-12
  • 使用IDEA創(chuàng)建Servlet程序的詳細(xì)步驟

    使用IDEA創(chuàng)建Servlet程序的詳細(xì)步驟

    在學(xué)習(xí)servlet過程中,參考的教程是用eclipse完成的,而我在練習(xí)的過程中是使用IDEA的,在創(chuàng)建servlet程序時遇到了挺多困難,在此記錄一下如何用IDEA完整創(chuàng)建一個servlet程序,感興趣的朋友一起看看吧
    2024-08-08
  • Java中PropertyDescriptor的用法及說明

    Java中PropertyDescriptor的用法及說明

    這篇文章主要介紹了Java中PropertyDescriptor的用法及說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07

最新評論