springboot連接kafka集群的使用示例
一、環(huán)境搭建
1.1 springboot 環(huán)境
- JDK 11+
- Maven 3.8.x+
- springboot 2.5.4 +
1.2 kafka 依賴
springboot的pom文件導(dǎo)入
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>二、 kafka 配置類
2.1 發(fā)布者
2.1.1 配置
發(fā)布者我們使用 KafkaTemplate 來(lái)進(jìn)行消息發(fā)布,所以需要先對(duì)其進(jìn)行一些必要的配置。
@Configuration
@EnableKafka
public class KafkaConfig {
/***** 發(fā)布者 *****/
//生產(chǎn)者工廠
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
//生產(chǎn)者配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
//生產(chǎn)者模板
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}2.1.2 構(gòu)建發(fā)布者類
配置完發(fā)布者,下來(lái)就是發(fā)布消息,我們需要繼承 ProducerListener<K, V> 接口,該接口完整信息如下:
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}實(shí)現(xiàn)該接口的方法,我們可以獲取包含發(fā)送結(jié)果(成功或失?。┑漠惒交卣{(diào),也就是可以在這個(gè)接口的實(shí)現(xiàn)中獲取發(fā)送結(jié)果。
我們簡(jiǎn)單的實(shí)現(xiàn)構(gòu)建一個(gè)發(fā)布者類,接收主題和發(fā)布消息參數(shù),并打印發(fā)布結(jié)果。
@Component
public class KafkaProducer implements ProducerListener<Object,Object> {
private static final Logger producerlog = LoggerFactory.getLogger(KafkaProducer.class);
private final KafkaTemplate<Integer, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<Integer, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void producer (String msg,String topic){
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic,0, msg);
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
producerlog.info("發(fā)送成功 {}", result);
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
producerlog.info("發(fā)送失敗 {}",failed);
}
});
}
}2.1.3 發(fā)布消息
寫一個(gè)controller類來(lái)測(cè)試我們構(gòu)建的發(fā)布者類,這個(gè)類中打印接收到的消息,來(lái)確保信息接收不出問(wèn)題。
@RestController
public class KafkaTestController {
private static final Logger kafkaTestLog = LoggerFactory.getLogger(KafkaTestController.class);
@Resource
private KafkaProducer kafkaProducer;
@GetMapping("/kafkaTest")
public void kafkaTest(String msg,String topic){
kafkaProducer.producer(msg,topic);
kafkaTestLog.info("接收到消息 {} {}",msg,topic);
}
}一切準(zhǔn)備就緒,我們啟動(dòng)程序利用postman來(lái)進(jìn)行簡(jiǎn)單的測(cè)試。
進(jìn)行消息發(fā)布:

發(fā)布結(jié)果:

可以看到消息發(fā)送成功。
我們?cè)倏纯磌afka消費(fèi)者有沒(méi)有接收到消息:

看以看到,kakfa的消費(fèi)者也接收到了消息。
2.2 消費(fèi)者
2.2.1 配置
消息的接受有多種方式,我們這里選擇的是使用 @KafkaListener 注解來(lái)進(jìn)行消息接收。它的使用像下面這樣:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}看起來(lái)不是太難吧,但使用這個(gè)注解,我們需要配置底層 ConcurrentMessageListenerContainer.kafkaListenerContainerFactor。
我們?cè)谠瓉?lái)的kafka配置類 KafkaConfig 中,繼續(xù)配置消費(fèi)者,大概就像下面這樣
@Configuration
@EnableKafka
public class KafkaConfig {
/***** 發(fā)布者 *****/
//生產(chǎn)者工廠
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
//生產(chǎn)者配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
//生產(chǎn)者模板
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/***** 消費(fèi)者 *****/
//容器監(jiān)聽(tīng)工廠
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
//消費(fèi)者工廠
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
//消費(fèi)者配置
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);
return props;
}
}注意,要設(shè)置容器屬性必須使用getContainerProperties()工廠方法。它用作注入容器的實(shí)際屬性的模板
2.2.2 構(gòu)建消費(fèi)者類
配置好后,我們就可以使用這個(gè)注解了。這個(gè)注解的使用有多種方式:
1、用它來(lái)覆蓋容器工廠的concurrency和屬性
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}2、可以使用顯式主題和分區(qū)(以及可選的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}3、將初始偏移應(yīng)用于所有已分配的分區(qū)
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}4、指定以逗號(hào)分隔的分區(qū)列表或分區(qū)范圍
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}5、可以向偵聽(tīng)器提供Acknowledgment
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}6、添加標(biāo)頭
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}我們這里寫一個(gè)簡(jiǎn)單的,只用它來(lái)接受指定主題的數(shù)據(jù):
@Component
public class KafkaConsumer {
private static final Logger consumerlog = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topicPartitions = @TopicPartition(topic = "kafka-topic-test",
partitions = "0"))
public void consumer (String data){
consumerlog.info("消費(fèi)者接收數(shù)據(jù) {}",data);
}
}這里解釋一下,因?yàn)槲覀冞M(jìn)行了手動(dòng)分配主題/分區(qū),所以 注解中g(shù)roup.id 可以為空。若要指定group.id請(qǐng)?jiān)谙M(fèi)者配置中加上props.put(ConsumerConfig.GROUP_ID_CONFIG, “bzt001”); 或在 @TopicPartition 注解后加上 groupId = “組id”
2.2.3 進(jìn)行消息消費(fèi)
繼續(xù)使用postman調(diào)用我們寫好的發(fā)布者發(fā)布消息,觀察控制臺(tái)的消費(fèi)者類是否有相關(guān)日志出現(xiàn)。

到此這篇關(guān)于springboot連接kafka集群的使用示例的文章就介紹到這了,更多相關(guān)springboot連接kafka集群內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示
- Kafka的安裝及接入SpringBoot的詳細(xì)過(guò)程
- springboot使用@KafkaListener監(jiān)聽(tīng)多個(gè)kafka配置實(shí)現(xiàn)
- Spring?Boot中KafkaListener的介紹、原理和使用方法案例詳解
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- springboot+kafka中@KafkaListener動(dòng)態(tài)指定多個(gè)topic問(wèn)題
- Spring Boot 集成 Kafka的詳細(xì)步驟
相關(guān)文章
Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解
這篇文章主要介紹了Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解,文中用代碼舉例講解的很清晰,有感興趣的同學(xué)可以研究下2021-03-03
IDEA中application.properties的圖標(biāo)顯示不正常的問(wèn)題及解決方法
這篇文章主要介紹了IDEA中application.properties的圖標(biāo)顯示不正常的問(wèn)題及解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04
Java基于NIO實(shí)現(xiàn)群聊系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java基于NIO實(shí)現(xiàn)群聊系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11
講解Java設(shè)計(jì)模式編程中的建造者模式與原型模式
這篇文章主要介紹了Java設(shè)計(jì)模式編程中的建造者模式與原型模式,設(shè)計(jì)模式有利于團(tuán)隊(duì)開發(fā)過(guò)程中的代碼維護(hù),需要的朋友可以參考下2016-02-02
Map如何根據(jù)key指定條件進(jìn)行過(guò)濾篩選
這篇文章主要介紹了Map如何根據(jù)key指定條件進(jìn)行過(guò)濾篩選問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
SpringBoot+Spring Security基于內(nèi)存用戶認(rèn)證的實(shí)現(xiàn)
本文介紹了SpringBoot+Spring Security基于內(nèi)存用戶認(rèn)證的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-11-11
java創(chuàng)建txt文件并寫入內(nèi)容的方法代碼示例
這篇文章主要介紹了java創(chuàng)建txt文件并寫入內(nèi)容的兩種方法,分別是使用java.io.FileWriter和BufferedWriter,以及使用Java7的java.nio.file包中的Files和Path類,需要的朋友可以參考下2025-01-01

