SpringBoot如何正確配置并運(yùn)行Kafka
一、配置pom.xml,引入maven依賴(lài)
<!-- 引入kafka依賴(lài) -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>二、application.yml配置文件
這里只提供了kafka有用的相關(guān)配置,其他的配置刪了
spring:
kafka:
bootstrap-servers: xx.xx.xx.xx:9092 # kafka集群信息,多個(gè)用逗號(hào)間隔
# 生產(chǎn)者
producer:
# 重試次數(shù),設(shè)置大于0的值,則客戶(hù)端會(huì)將發(fā)送失敗的記錄重新發(fā)送
retries: 3
batch-size: 16384 #批量處理大小,16K
buffer-memory: 33554432 #緩沖存儲(chǔ)大,32M
acks: 1
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消費(fèi)者
consumer:
# 消費(fèi)者組
group-id: TestGroup
# 是否自動(dòng)提交
enable-auto-commit: false
# 消費(fèi)偏移配置
# none:如果沒(méi)有為消費(fèi)者找到先前的offset的值,即沒(méi)有自動(dòng)維護(hù)偏移量,也沒(méi)有手動(dòng)維護(hù)偏移量,則拋出異常
# earliest:在各分區(qū)下有提交的offset時(shí):從offset處開(kāi)始消費(fèi);在各分區(qū)下無(wú)提交的offset時(shí):從頭開(kāi)始消費(fèi)
# latest:在各分區(qū)下有提交的offset時(shí):從offset處開(kāi)始消費(fèi);在各分區(qū)下無(wú)提交的offset時(shí):從最新的數(shù)據(jù)開(kāi)始消費(fèi)
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 監(jiān)聽(tīng)
listener:
# record:當(dāng)每一條記錄被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交
# batch:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后提交
# time:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后,距離上次提交時(shí)間大于TIME時(shí)提交
# count:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交
# count_time:TIME或COUNT中有一個(gè)條件滿足時(shí)提交
# manual:當(dāng)每一批poll()的數(shù)據(jù)被ListenerConsumer處理之后, 手動(dòng)調(diào)用Acknowledgment.acknowledge()后提交
# manual_immediate:手動(dòng)調(diào)用Acknowledgment.acknowledge()后立即提交,一般推薦使用這種
ack-mode: manual_immediate三、消費(fèi)者
消費(fèi)者監(jiān)聽(tīng),可以配置多個(gè)監(jiān)聽(tīng)器
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* 消費(fèi)者
* kafka監(jiān)聽(tīng)器
*/
@Component
public class KafkaConsumer {
/**
* kafka的監(jiān)聽(tīng)器1,topic為"topic_test",消費(fèi)者組為"group_topic_test"
* @param record
* @param item
*/
@KafkaListener(topics = "topic_test", groupId = "group_topic_test")
public void topicListener1(ConsumerRecord<String, String> record, Acknowledgment item) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手動(dòng)提交
item.acknowledge();
}
/**
* 配置多個(gè)消費(fèi)組
* kafka的監(jiān)聽(tīng)器2,topic為"topic_test2",消費(fèi)者組為"group_topic_test"
* @param record
* @param item
*/
@KafkaListener(topics = "topic_test2",groupId = "group_topic_test2")
public void topicListener2(ConsumerRecord<String, String> record, Acknowledgment item) {
String value = record.value();
System.out.println(value);
System.out.println(record);
item.acknowledge();
}
}四、生產(chǎn)者
生產(chǎn)者作為接口Api作為測(cè)試
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka生產(chǎn)者
*/
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send() {
kafkaTemplate.send("topic_test", "key", "測(cè)試kafka消息");
}
}五、調(diào)用測(cè)試
啟動(dòng)Boot項(xiàng)目,使用Postman工具發(fā)送GET請(qǐng)求:
http://localhost:8080/kafka/send
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringBoot如何集成Kafka低版本和高版本
- springboot如何配置多kafka
- kafka springBoot配置的實(shí)現(xiàn)
- Springboot使用kafka的兩種方式
- springboot連接kafka集群的使用示例
- SpringBoot3集成Kafka的方法詳解
- Springboot系列之kafka操作使用詳解
- springboot項(xiàng)目配置多個(gè)kafka的示例代碼
- springboot+kafka中@KafkaListener動(dòng)態(tài)指定多個(gè)topic問(wèn)題
- springboot使用@KafkaListener監(jiān)聽(tīng)多個(gè)kafka配置實(shí)現(xiàn)
相關(guān)文章
El表達(dá)式使用問(wèn)題javax.el.ELException:Failed to parse the expression
今天小編就為大家分享一篇關(guān)于Jsp El表達(dá)式使用問(wèn)題javax.el.ELException:Failed to parse the expression的解決方式,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12
JDK動(dòng)態(tài)代理之WeakCache緩存的實(shí)現(xiàn)機(jī)制
這篇文章主要介紹了JDK動(dòng)態(tài)代理之WeakCache緩存的實(shí)現(xiàn)機(jī)制2018-02-02
Java Socket實(shí)現(xiàn)簡(jiǎn)易聊天室
這篇文章主要為大家詳細(xì)介紹了Java Socket實(shí)現(xiàn)簡(jiǎn)易聊天室,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-03-03
SpringBoot中間件ORM框架實(shí)現(xiàn)案例詳解(Mybatis)
這篇文章主要介紹了SpringBoot中間件ORM框架實(shí)現(xiàn)案例詳解(Mybatis),本篇文章提煉出mybatis最經(jīng)典、最精簡(jiǎn)、最核心的代碼設(shè)計(jì),來(lái)實(shí)現(xiàn)一個(gè)mini-mybatis,從而熟悉并掌握ORM框架的涉及實(shí)現(xiàn),需要的朋友可以參考下2023-07-07
使用IDEA創(chuàng)建Servlet程序的詳細(xì)步驟
在學(xué)習(xí)servlet過(guò)程中,參考的教程是用eclipse完成的,而我在練習(xí)的過(guò)程中是使用IDEA的,在創(chuàng)建servlet程序時(shí)遇到了挺多困難,在此記錄一下如何用IDEA完整創(chuàng)建一個(gè)servlet程序,感興趣的朋友一起看看吧2024-08-08
Java中PropertyDescriptor的用法及說(shuō)明
這篇文章主要介紹了Java中PropertyDescriptor的用法及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07

