SpringBoot如何正確配置并運(yùn)行Kafka
一、配置pom.xml,引入maven依賴
<!-- 引入kafka依賴 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.6</version> </dependency>
二、application.yml配置文件
這里只提供了kafka有用的相關(guān)配置,其他的配置刪了
spring: kafka: bootstrap-servers: xx.xx.xx.xx:9092 # kafka集群信息,多個用逗號間隔 # 生產(chǎn)者 producer: # 重試次數(shù),設(shè)置大于0的值,則客戶端會將發(fā)送失敗的記錄重新發(fā)送 retries: 3 batch-size: 16384 #批量處理大小,16K buffer-memory: 33554432 #緩沖存儲大,32M acks: 1 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消費(fèi)者 consumer: # 消費(fèi)者組 group-id: TestGroup # 是否自動提交 enable-auto-commit: false # 消費(fèi)偏移配置 # none:如果沒有為消費(fèi)者找到先前的offset的值,即沒有自動維護(hù)偏移量,也沒有手動維護(hù)偏移量,則拋出異常 # earliest:在各分區(qū)下有提交的offset時:從offset處開始消費(fèi);在各分區(qū)下無提交的offset時:從頭開始消費(fèi) # latest:在各分區(qū)下有提交的offset時:從offset處開始消費(fèi);在各分區(qū)下無提交的offset時:從最新的數(shù)據(jù)開始消費(fèi) auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 監(jiān)聽 listener: # record:當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交 # batch:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后提交 # time:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后,距離上次提交時間大于TIME時提交 # count:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后,被處理record數(shù)量大于等于COUNT時提交 # count_time:TIME或COUNT中有一個條件滿足時提交 # manual:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后, 手動調(diào)用Acknowledgment.acknowledge()后提交 # manual_immediate:手動調(diào)用Acknowledgment.acknowledge()后立即提交,一般推薦使用這種 ack-mode: manual_immediate
三、消費(fèi)者
消費(fèi)者監(jiān)聽,可以配置多個監(jiān)聽器
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; /** * 消費(fèi)者 * kafka監(jiān)聽器 */ @Component public class KafkaConsumer { /** * kafka的監(jiān)聽器1,topic為"topic_test",消費(fèi)者組為"group_topic_test" * @param record * @param item */ @KafkaListener(topics = "topic_test", groupId = "group_topic_test") public void topicListener1(ConsumerRecord<String, String> record, Acknowledgment item) { String value = record.value(); System.out.println(value); System.out.println(record); //手動提交 item.acknowledge(); } /** * 配置多個消費(fèi)組 * kafka的監(jiān)聽器2,topic為"topic_test2",消費(fèi)者組為"group_topic_test" * @param record * @param item */ @KafkaListener(topics = "topic_test2",groupId = "group_topic_test2") public void topicListener2(ConsumerRecord<String, String> record, Acknowledgment item) { String value = record.value(); System.out.println(value); System.out.println(record); item.acknowledge(); } }
四、生產(chǎn)者
生產(chǎn)者作為接口Api作為測試
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * kafka生產(chǎn)者 */ @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/send") public void send() { kafkaTemplate.send("topic_test", "key", "測試kafka消息"); } }
五、調(diào)用測試
啟動Boot項目,使用Postman工具發(fā)送GET請求:
http://localhost:8080/kafka/send
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
El表達(dá)式使用問題javax.el.ELException:Failed to parse the expression
今天小編就為大家分享一篇關(guān)于Jsp El表達(dá)式使用問題javax.el.ELException:Failed to parse the expression的解決方式,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12JDK動態(tài)代理之WeakCache緩存的實現(xiàn)機(jī)制
這篇文章主要介紹了JDK動態(tài)代理之WeakCache緩存的實現(xiàn)機(jī)制2018-02-02SpringBoot中間件ORM框架實現(xiàn)案例詳解(Mybatis)
這篇文章主要介紹了SpringBoot中間件ORM框架實現(xiàn)案例詳解(Mybatis),本篇文章提煉出mybatis最經(jīng)典、最精簡、最核心的代碼設(shè)計,來實現(xiàn)一個mini-mybatis,從而熟悉并掌握ORM框架的涉及實現(xiàn),需要的朋友可以參考下2023-07-07使用IDEA創(chuàng)建Servlet程序的詳細(xì)步驟
在學(xué)習(xí)servlet過程中,參考的教程是用eclipse完成的,而我在練習(xí)的過程中是使用IDEA的,在創(chuàng)建servlet程序時遇到了挺多困難,在此記錄一下如何用IDEA完整創(chuàng)建一個servlet程序,感興趣的朋友一起看看吧2024-08-08