Springboot使用kafka的兩種方式
1、創(chuàng)建實(shí)驗(yàn)項(xiàng)目
第一步創(chuàng)建一個(gè)Springboot項(xiàng)目,引入spring-kafka依賴,這是后面的基礎(chǔ)。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
kafka配置
spring:
kafka:
bootstrap-servers: kafka.tyjt.com:9092
consumer:
auto-offset-reset: earliest
group-id: sharingan-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
2、自動(dòng)檔
為了方便使用kafka,Springboot提供了spring-kafka 這個(gè)包,在已開(kāi)始我們已經(jīng)導(dǎo)入了,下面直接使用吧
Spring項(xiàng)目里引入Kafka非常方便,使用kafkaTemplate(Producer的模版)+@KafkaListener(Consumer的監(jiān)聽(tīng)器)即可完成生產(chǎn)者-消費(fèi)者的代碼開(kāi)發(fā)
2.1 監(jiān)聽(tīng)listener
為了使創(chuàng)建 kafka 監(jiān)聽(tīng)器更加簡(jiǎn)單,Spring For Kafka 提供了 @KafkaListener 注解,
@KafkaListener 注解配置方法上,凡是此注解的方法就會(huì)被標(biāo)記為是 Kafka 消息監(jiān)聽(tīng)器,所以可以用
@KafkaListener 注解快速創(chuàng)建消息監(jiān)聽(tīng)器。
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@KafkaListener(topics = {"test"},groupId = "group1")
public void kafkaListener(String topic,String message){
System.out.println("消息:"+message);
}
}
2.2 發(fā)布消息
發(fā)布消息通過(guò)kafkaTemplate,kafkaTemplate是spring-kafka 的封裝
@Slf4j
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String message) throws Exception {
kafkaTemplate.send(topic,key,message);
}
}
kafkaTemplate 有很多不同的發(fā)送方法,根據(jù)自己的需求使用,這里只記錄最簡(jiǎn)單的狀況。
3、手動(dòng)檔
3.1 手動(dòng)創(chuàng)建consumer
關(guān)于consumer的主要的封裝在ConcurrentKafkaListenerContainerFactory這個(gè)里頭,
本身的KafkaConsumer是線程不安全的,無(wú)法并發(fā)操作,這里spring又在包裝了一層,根據(jù)配置的spring.kafka.listener.concurrency來(lái)生成多個(gè)并發(fā)的KafkaMessageListenerContainer實(shí)例
package com.tyjt.sharingan.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 啟動(dòng)kafka consumer
*
* @author 種鑫
* @date 2023/10/18 17:26
*/
@EnableKafka
@Component
@Slf4j
public class KafkaConsumerMgr {
@Resource
ConcurrentKafkaListenerContainerFactory<String, byte[]> containerFactory;
Map<String, ConcurrentMessageListenerContainer<?, ?>> containerMap = new ConcurrentHashMap<>();
public void startListener(KafkaProtoConsumer kafkaConsumer) {
// 停止相同的
if (containerMap.containsKey(kafkaConsumer.getTopic())) {
containerMap.get(kafkaConsumer.getTopic()).stop();
}
ConcurrentMessageListenerContainer<String, byte[]> container = createListenerContainer(kafkaConsumer);
container.start();
containerMap.put(kafkaConsumer.getTopic(), container);
}
private ConcurrentMessageListenerContainer<String, byte[]> createListenerContainer(KafkaProtoConsumer consumer) {
ConcurrentMessageListenerContainer<String, byte[]> container = containerFactory.createContainer(consumer.topic());
container.setBeanName(consumer.group() + "-" + consumer.topic());
container.setConcurrency(consumer.getPartitionCount());
consumer.deployContainer(container);
// 防止被修改的配置
ContainerProperties containerProperties = container.getContainerProperties();
containerProperties.setMessageListener(new Listener<>(consumer));
containerProperties.setAsyncAcks(false);
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setGroupId(consumer.group());
return container;
}
/**
* 定義監(jiān)聽(tīng)
*/
private static class Listener<T> implements AcknowledgingConsumerAwareMessageListener<String, T> {
private final KafkaConsumer<T> kafkaConsumer;
public Listener(KafkaConsumer<T> consumer) {
this.kafkaConsumer = consumer;
}
@Override
public void onMessage(ConsumerRecord<String, T> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
log.info("group【{}】接收到來(lái)自topic【{}】的消息", kafkaConsumer.group(), data.topic());
// 處理數(shù)據(jù)
kafkaConsumer.process(data.value());
// 提交offset
log.info("group【{}】提交topic【{}】的offset", kafkaConsumer.group(), data.topic());
consumer.commitSync();
}
}
}
這個(gè)可以根據(jù)需要?jiǎng)討B(tài)的啟動(dòng)消費(fèi)者
3.2 手動(dòng)創(chuàng)建KafkaProducer
@Bean
public KafkaProducer<String, byte[]> kafkaProducer() {
Properties props = new Properties();
// 這里可以配置幾臺(tái)broker即可,他會(huì)自動(dòng)從broker去拉取元數(shù)據(jù)進(jìn)行緩存
props.put("bootstrap.servers", bootstrapServers);
// 這個(gè)就是負(fù)責(zé)把發(fā)送的key從字符串序列化為字節(jié)數(shù)組
props.put("key.serializer", keySerializer);
// 這個(gè)就是負(fù)責(zé)把你發(fā)送的實(shí)際的message從字符串序列化為字節(jié)數(shù)組
props.put("value.serializer", valueSerializer);
// 默認(rèn)是32兆=33554432
props.put("buffer.memory", bufferMemory);
// 一般來(lái)說(shuō)是要自己手動(dòng)設(shè)置的,不是純粹依靠默認(rèn)值的,16kb
props.put("batch.size", batchSize);
// 發(fā)送一條消息出去,100ms內(nèi)還沒(méi)有湊成一個(gè)batch發(fā)送,必須立即發(fā)送出去
props.put("linger.ms", lingerMs);
// 這個(gè)是說(shuō)你可以發(fā)送的最大的請(qǐng)求的大小 默認(rèn)是1m=1048576
// props.put("max.request.size", 10485760);
// follower有沒(méi)有同步成功你就不管了
props.put("acks", acks);
// 這個(gè)重試,一般來(lái)說(shuō),給個(gè)3次~5次就足夠了,可以cover住一般的異常場(chǎng)景
props.put("retries", retries);
// 每次重試間隔100ms
props.put("retry.backoff.ms", retryBackOffMs);
props.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection);
return new KafkaProducer<>(props);
}
4、總結(jié)
4.1 區(qū)別
KafkaProducer是Kafka-client提供的原生Java Kafka客戶端發(fā)送消息的API。
KafkaTemplate是Spring Kafka中提供的一個(gè)高級(jí)工具類,用于可以方便地發(fā)送消息到Kafka。它封裝了KafkaProducer,提供了更多的便利方法和更高級(jí)的消息發(fā)送方式。
org.apache.kafka.clients.producer.KafkaProducer
org.springframework.kafka.core.KafkaTemplate

4.2 場(chǎng)景選擇
在spring應(yīng)用中如果需要訂閱kafka消息,通常情況下我們不會(huì)直接使用kafka-client, 而是使用更方便的一層封裝spring-kafka。
不需要?jiǎng)討B(tài)的選擇時(shí)候可以使用Spring-kafka,在需要?jiǎng)討B(tài)創(chuàng)建時(shí)可以使用kafka-client的api進(jìn)行處理
4.3 ConsumerRecord和ProducerRecord
兩者都是kafka-client的類,在Spring-kafka中依然可以使用,可以發(fā)送和接受
以上就是Springboot使用kafka的兩種方式的詳細(xì)內(nèi)容,更多關(guān)于Springboot使用kafka的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- SpringBoot如何集成Kafka低版本和高版本
- springboot如何配置多kafka
- kafka springBoot配置的實(shí)現(xiàn)
- springboot連接kafka集群的使用示例
- SpringBoot3集成Kafka的方法詳解
- Springboot系列之kafka操作使用詳解
- SpringBoot如何正確配置并運(yùn)行Kafka
- springboot項(xiàng)目配置多個(gè)kafka的示例代碼
- springboot+kafka中@KafkaListener動(dòng)態(tài)指定多個(gè)topic問(wèn)題
- springboot使用@KafkaListener監(jiān)聽(tīng)多個(gè)kafka配置實(shí)現(xiàn)
相關(guān)文章
詳解SpringBoot中添加@ResponseBody注解會(huì)發(fā)生什么
這篇文章主要介紹了詳解SpringBoot中添加@ResponseBody注解會(huì)發(fā)生什么,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
使用BeanFactory實(shí)現(xiàn)創(chuàng)建對(duì)象
這篇文章主要為大家詳細(xì)介紹了使用BeanFactory實(shí)現(xiàn)創(chuàng)建對(duì)象,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-08-08
Mybatisplus更新某個(gè)字段為null問(wèn)題
Mybatisplus更新某個(gè)字段為null時(shí),可以使用@TableField(updateStrategy=FieldStrategy.IGNORED)注解,該注解提供了字段映射、忽略非表字段、自動(dòng)填充策略、字段條件處理等功能2025-02-02
java調(diào)用微信現(xiàn)金紅包接口的心得與體會(huì)總結(jié)
這篇文章主要介紹了java調(diào)用微信現(xiàn)金紅包接口的心得與體會(huì)總結(jié),有需要的朋友可以了解一下。2016-11-11
Springboot使用maven打包指定mainClass問(wèn)題
這篇文章主要介紹了Springboot使用maven打包指定mainClass問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04
Spring @Valid和@Validated區(qū)別和用法實(shí)例
這篇文章主要介紹了Spring @Valid和@Validated區(qū)別和用法實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
Mybatis-Plus中的MetaObjectHandler組件的使用
MetaObjectHandler是Mybatis-Plus中一個(gè)實(shí)用組件,專門(mén)用于自動(dòng)處理實(shí)體對(duì)象中的特定字段,如創(chuàng)建時(shí)間、更新時(shí)間、創(chuàng)建人和修改人等,該接口允許開(kāi)發(fā)者在不修改業(yè)務(wù)代碼的情況下,實(shí)現(xiàn)自動(dòng)填充功能,極大地簡(jiǎn)化了代碼的復(fù)雜性,感興趣的可以了解一下2024-10-10
Java中Lombok @Value注解導(dǎo)致的variable not been initialized問(wèn)題
本文主要介紹了Java中Lombok @Value注解導(dǎo)致的variable not been initialized問(wèn)題,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07

