Springboot使用kafka的兩種方式
1、創(chuàng)建實(shí)驗(yàn)項(xiàng)目
第一步創(chuàng)建一個(gè)Springboot項(xiàng)目,引入spring-kafka依賴,這是后面的基礎(chǔ)。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
kafka配置
spring: kafka: bootstrap-servers: kafka.tyjt.com:9092 consumer: auto-offset-reset: earliest group-id: sharingan-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
2、自動(dòng)檔
為了方便使用kafka,Springboot提供了spring-kafka 這個(gè)包,在已開始我們已經(jīng)導(dǎo)入了,下面直接使用吧
Spring項(xiàng)目里引入Kafka非常方便,使用kafkaTemplate(Producer的模版)+@KafkaListener(Consumer的監(jiān)聽器)即可完成生產(chǎn)者-消費(fèi)者的代碼開發(fā)
2.1 監(jiān)聽listener
為了使創(chuàng)建 kafka 監(jiān)聽器更加簡(jiǎn)單,Spring For Kafka 提供了 @KafkaListener 注解,
@KafkaListener 注解配置方法上,凡是此注解的方法就會(huì)被標(biāo)記為是 Kafka 消息監(jiān)聽器,所以可以用
@KafkaListener 注解快速創(chuàng)建消息監(jiān)聽器。
@Configuration @EnableKafka public class ConsumerConfigDemo { @KafkaListener(topics = {"test"},groupId = "group1") public void kafkaListener(String topic,String message){ System.out.println("消息:"+message); } }
2.2 發(fā)布消息
發(fā)布消息通過kafkaTemplate,kafkaTemplate是spring-kafka 的封裝
@Slf4j @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String key, String message) throws Exception { kafkaTemplate.send(topic,key,message); } }
kafkaTemplate 有很多不同的發(fā)送方法,根據(jù)自己的需求使用,這里只記錄最簡(jiǎn)單的狀況。
3、手動(dòng)檔
3.1 手動(dòng)創(chuàng)建consumer
關(guān)于consumer的主要的封裝在ConcurrentKafkaListenerContainerFactory這個(gè)里頭,
本身的KafkaConsumer是線程不安全的,無法并發(fā)操作,這里spring又在包裝了一層,根據(jù)配置的spring.kafka.listener.concurrency來生成多個(gè)并發(fā)的KafkaMessageListenerContainer實(shí)例
package com.tyjt.sharingan.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 啟動(dòng)kafka consumer * * @author 種鑫 * @date 2023/10/18 17:26 */ @EnableKafka @Component @Slf4j public class KafkaConsumerMgr { @Resource ConcurrentKafkaListenerContainerFactory<String, byte[]> containerFactory; Map<String, ConcurrentMessageListenerContainer<?, ?>> containerMap = new ConcurrentHashMap<>(); public void startListener(KafkaProtoConsumer kafkaConsumer) { // 停止相同的 if (containerMap.containsKey(kafkaConsumer.getTopic())) { containerMap.get(kafkaConsumer.getTopic()).stop(); } ConcurrentMessageListenerContainer<String, byte[]> container = createListenerContainer(kafkaConsumer); container.start(); containerMap.put(kafkaConsumer.getTopic(), container); } private ConcurrentMessageListenerContainer<String, byte[]> createListenerContainer(KafkaProtoConsumer consumer) { ConcurrentMessageListenerContainer<String, byte[]> container = containerFactory.createContainer(consumer.topic()); container.setBeanName(consumer.group() + "-" + consumer.topic()); container.setConcurrency(consumer.getPartitionCount()); consumer.deployContainer(container); // 防止被修改的配置 ContainerProperties containerProperties = container.getContainerProperties(); containerProperties.setMessageListener(new Listener<>(consumer)); containerProperties.setAsyncAcks(false); containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); containerProperties.setGroupId(consumer.group()); return container; } /** * 定義監(jiān)聽 */ private static class Listener<T> implements AcknowledgingConsumerAwareMessageListener<String, T> { private final KafkaConsumer<T> kafkaConsumer; public Listener(KafkaConsumer<T> consumer) { this.kafkaConsumer = consumer; } @Override public void onMessage(ConsumerRecord<String, T> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { log.info("group【{}】接收到來自topic【{}】的消息", kafkaConsumer.group(), data.topic()); // 處理數(shù)據(jù) kafkaConsumer.process(data.value()); // 提交offset log.info("group【{}】提交topic【{}】的offset", kafkaConsumer.group(), data.topic()); consumer.commitSync(); } } }
這個(gè)可以根據(jù)需要?jiǎng)討B(tài)的啟動(dòng)消費(fèi)者
3.2 手動(dòng)創(chuàng)建KafkaProducer
@Bean public KafkaProducer<String, byte[]> kafkaProducer() { Properties props = new Properties(); // 這里可以配置幾臺(tái)broker即可,他會(huì)自動(dòng)從broker去拉取元數(shù)據(jù)進(jìn)行緩存 props.put("bootstrap.servers", bootstrapServers); // 這個(gè)就是負(fù)責(zé)把發(fā)送的key從字符串序列化為字節(jié)數(shù)組 props.put("key.serializer", keySerializer); // 這個(gè)就是負(fù)責(zé)把你發(fā)送的實(shí)際的message從字符串序列化為字節(jié)數(shù)組 props.put("value.serializer", valueSerializer); // 默認(rèn)是32兆=33554432 props.put("buffer.memory", bufferMemory); // 一般來說是要自己手動(dòng)設(shè)置的,不是純粹依靠默認(rèn)值的,16kb props.put("batch.size", batchSize); // 發(fā)送一條消息出去,100ms內(nèi)還沒有湊成一個(gè)batch發(fā)送,必須立即發(fā)送出去 props.put("linger.ms", lingerMs); // 這個(gè)是說你可以發(fā)送的最大的請(qǐng)求的大小 默認(rèn)是1m=1048576 // props.put("max.request.size", 10485760); // follower有沒有同步成功你就不管了 props.put("acks", acks); // 這個(gè)重試,一般來說,給個(gè)3次~5次就足夠了,可以cover住一般的異常場(chǎng)景 props.put("retries", retries); // 每次重試間隔100ms props.put("retry.backoff.ms", retryBackOffMs); props.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection); return new KafkaProducer<>(props); }
4、總結(jié)
4.1 區(qū)別
KafkaProducer是Kafka-client提供的原生Java Kafka客戶端發(fā)送消息的API。
KafkaTemplate是Spring Kafka中提供的一個(gè)高級(jí)工具類,用于可以方便地發(fā)送消息到Kafka。它封裝了KafkaProducer,提供了更多的便利方法和更高級(jí)的消息發(fā)送方式。
org.apache.kafka.clients.producer.KafkaProducer
org.springframework.kafka.core.KafkaTemplate
4.2 場(chǎng)景選擇
在spring應(yīng)用中如果需要訂閱kafka消息,通常情況下我們不會(huì)直接使用kafka-client, 而是使用更方便的一層封裝spring-kafka。
不需要?jiǎng)討B(tài)的選擇時(shí)候可以使用Spring-kafka,在需要?jiǎng)討B(tài)創(chuàng)建時(shí)可以使用kafka-client的api進(jìn)行處理
4.3 ConsumerRecord和ProducerRecord
兩者都是kafka-client的類,在Spring-kafka中依然可以使用,可以發(fā)送和接受
以上就是Springboot使用kafka的兩種方式的詳細(xì)內(nèi)容,更多關(guān)于Springboot使用kafka的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- SpringBoot如何集成Kafka低版本和高版本
- springboot如何配置多kafka
- kafka springBoot配置的實(shí)現(xiàn)
- springboot連接kafka集群的使用示例
- SpringBoot3集成Kafka的方法詳解
- Springboot系列之kafka操作使用詳解
- SpringBoot如何正確配置并運(yùn)行Kafka
- springboot項(xiàng)目配置多個(gè)kafka的示例代碼
- springboot+kafka中@KafkaListener動(dòng)態(tài)指定多個(gè)topic問題
- springboot使用@KafkaListener監(jiān)聽多個(gè)kafka配置實(shí)現(xiàn)
相關(guān)文章
詳解SpringBoot中添加@ResponseBody注解會(huì)發(fā)生什么
這篇文章主要介紹了詳解SpringBoot中添加@ResponseBody注解會(huì)發(fā)生什么,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11使用BeanFactory實(shí)現(xiàn)創(chuàng)建對(duì)象
這篇文章主要為大家詳細(xì)介紹了使用BeanFactory實(shí)現(xiàn)創(chuàng)建對(duì)象,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-08-08java調(diào)用微信現(xiàn)金紅包接口的心得與體會(huì)總結(jié)
這篇文章主要介紹了java調(diào)用微信現(xiàn)金紅包接口的心得與體會(huì)總結(jié),有需要的朋友可以了解一下。2016-11-11Springboot使用maven打包指定mainClass問題
這篇文章主要介紹了Springboot使用maven打包指定mainClass問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04Spring @Valid和@Validated區(qū)別和用法實(shí)例
這篇文章主要介紹了Spring @Valid和@Validated區(qū)別和用法實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04Mybatis-Plus中的MetaObjectHandler組件的使用
MetaObjectHandler是Mybatis-Plus中一個(gè)實(shí)用組件,專門用于自動(dòng)處理實(shí)體對(duì)象中的特定字段,如創(chuàng)建時(shí)間、更新時(shí)間、創(chuàng)建人和修改人等,該接口允許開發(fā)者在不修改業(yè)務(wù)代碼的情況下,實(shí)現(xiàn)自動(dòng)填充功能,極大地簡(jiǎn)化了代碼的復(fù)雜性,感興趣的可以了解一下2024-10-10Java中Lombok @Value注解導(dǎo)致的variable not been initialized問題
本文主要介紹了Java中Lombok @Value注解導(dǎo)致的variable not been initialized問題,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07