SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案
前言
網(wǎng)上找了很多管理kafka整合springboot的教程,但是很多都沒辦法應(yīng)用到生產(chǎn)環(huán)境。很多配置都是缺少,或者不正確的,只能當(dāng)個(gè)demo。遂自行查找資料、驗(yàn)證得到了一套可以完美應(yīng)對(duì)生產(chǎn)環(huán)境的kafka整合springboot的方案。
基礎(chǔ)配置
引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
編寫kafka
相關(guān)配置
spring: kafka: producer: bootstrap-servers: localhost:9096,localhost:9097,localhost:9098 # transaction-id-prefix: starlinkRisk- retries: 3 acks: 1 batch-size: 32768 buffer-memory: 33554432 consumer: bootstrap-servers: localhost:9096,localhost:9097,localhost:9098 group-id: slr_connector auto-commit-interval: 2000 auto-offset-reset: latest enable-auto-commit: false max-poll-records: 3 properties: max: poll: interval: ms: 600000 session: timeout: ms: 10000 listener: concurrency: 9 missing-topics-fatal: false poll-timeout: 600000
編寫kafka
生產(chǎn)者配置類
import com.liboshuai.starlink.slr.connector.common.handler.KafkaSendResultHandler; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * Kafka生產(chǎn)者配置 */ @Slf4j @Configuration public class KafkaProviderConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; // @Value("${spring.kafka.producer.transaction-id-prefix}") // private String transactionIdPrefix; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.retries}") private String retries; @Value("${spring.kafka.producer.batch-size}") private String batchSize; @Value("${spring.kafka.producer.buffer-memory}") private String bufferMemory; @Resource private KafkaSendResultHandler kafkaSendResultHandler; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(16); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //acks=0 : 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng)。 //acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來自服務(wù)器成功響應(yīng)。 //acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來自服務(wù)器的成功響應(yīng)。 //開啟事務(wù)必須設(shè)為all props.put(ProducerConfig.ACKS_CONFIG, acks); //發(fā)生錯(cuò)誤后,消息重發(fā)的次數(shù),開啟事務(wù)必須大于0 props.put(ProducerConfig.RETRIES_CONFIG, retries); //當(dāng)多個(gè)消息發(fā)送到相同分區(qū)時(shí),生產(chǎn)者會(huì)將消息打包到一起,以減少請(qǐng)求交互. 而不是一條條發(fā)送 //批次的大小可以通過batch.size 參數(shù)設(shè)置.默認(rèn)是16KB //較小的批次大小有可能降低吞吐量(批次大小為0則完全禁用批處理)。 //比如說,kafka里的消息5秒鐘Batch才湊滿了16KB,才能發(fā)送出去。那這些消息的延遲就是5秒鐘 //實(shí)測(cè)batchSize這個(gè)參數(shù)沒有用 props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); //有的時(shí)刻消息比較少,過了很久,比如5min也沒有湊夠16KB,這樣延時(shí)就很大,所以需要一個(gè)參數(shù). 再設(shè)置一個(gè)時(shí)間,到了這個(gè)時(shí)間, //即使數(shù)據(jù)沒達(dá)到16KB,也將這個(gè)批次發(fā)送出去 props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); //生產(chǎn)者內(nèi)存緩沖區(qū)的大小 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); //反序列化,和生產(chǎn)者的序列化方式對(duì)應(yīng) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory<String, Object> producerFactory() { DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs()); //開啟事務(wù),會(huì)導(dǎo)致 LINGER_MS_CONFIG 配置失效 // factory.setTransactionIdPrefix(transactionIdPrefix); return factory; } // @Bean // public KafkaTransactionManager<String, Object> kafkaTransactionManager(ProducerFactory<String, Object> producerFactory) { // return new KafkaTransactionManager<>(producerFactory); // } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setProducerListener(kafkaSendResultHandler); return kafkaTemplate; } }
編寫kafka
消費(fèi)者配置類
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import java.util.HashMap; import java.util.Map; /** * Kafka消費(fèi)者配置 */ @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${spring.kafka.properties.session.timeout.ms}") private String sessionTimeout; @Value("${spring.kafka.properties.max.poll.interval.ms}") private String maxPollIntervalTime; @Value("${spring.kafka.consumer.max-poll-records}") private String maxPollRecords; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.listener.concurrency}") private Integer concurrency; @Value("${spring.kafka.listener.missing-topics-fatal}") private boolean missingTopicsFatal; @Value("${spring.kafka.listener.poll-timeout}") private long pollTimeout; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(16); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //是否自動(dòng)提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動(dòng)提交偏移量 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); //自動(dòng)提交的時(shí)間間隔,自動(dòng)提交開啟時(shí)生效 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); //該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理: //earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),從頭開始消費(fèi)分區(qū)的記錄 //latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄) //none:當(dāng)各分區(qū)都存在已提交的offset時(shí),從提交的offset開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); //兩次poll之間的最大間隔,默認(rèn)值為5分鐘。如果超過這個(gè)間隔會(huì)觸發(fā)reBalance propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); //這個(gè)參數(shù)定義了poll方法最多可以拉取多少條消息,默認(rèn)值為500。如果在拉取消息的時(shí)候新消息不足500條,那有多少返回多少;如果超過500條,每次只返回500。 //這個(gè)默認(rèn)值在有些場(chǎng)景下太大,有些場(chǎng)景很難保證能夠在5min內(nèi)處理完500條消息, //如果消費(fèi)者無法在5分鐘內(nèi)處理完500條消息的話就會(huì)觸發(fā)reBalance, //然后這批消息會(huì)被分配到另一個(gè)消費(fèi)者中,還是會(huì)處理不完,這樣這批消息就永遠(yuǎn)也處理不完。 //要避免出現(xiàn)上述問題,提前評(píng)估好處理一條消息最長(zhǎng)需要多少時(shí)間,然后覆蓋默認(rèn)的max.poll.records參數(shù) //注:需要開啟BatchListener批量監(jiān)聽才會(huì)生效,如果不開啟BatchListener則不會(huì)出現(xiàn)reBalance情況 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); //當(dāng)broker多久沒有收到consumer的心跳請(qǐng)求后就觸發(fā)reBalance,默認(rèn)值是10s propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); // 設(shè)置反序列化器為 ErrorHandlingDeserializer,防止藥丸信息 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); // 配置 ErrorHandlingDeserializer 的委托反序列化器為 StringDeserializer propsMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); propsMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Object, Object> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //在偵聽器容器中運(yùn)行的線程數(shù),一般設(shè)置為 機(jī)器數(shù)*分區(qū)數(shù) factory.setConcurrency(concurrency); //消費(fèi)監(jiān)聽接口監(jiān)聽的主題不存在時(shí),默認(rèn)會(huì)報(bào)錯(cuò),所以設(shè)置為false忽略錯(cuò)誤 factory.setMissingTopicsFatal(missingTopicsFatal); //自動(dòng)提交關(guān)閉,需要設(shè)置手動(dòng)消息確認(rèn) factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.getContainerProperties().setPollTimeout(pollTimeout); //設(shè)置為批量監(jiān)聽,需要用List接收(相等于listener.type: batch) factory.setBatchListener(true); return factory; } }
編寫kafka
發(fā)送結(jié)果回調(diào)處理類
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.kafka.support.ProducerListener; import org.springframework.stereotype.Component; @Slf4j @Component("kafkaSendResultHandler") public class KafkaSendResultHandler implements ProducerListener<String, Object> { @Override public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) { // 記錄成功發(fā)送的消息信息 if (recordMetadata != null) { log.info("Kafka消息發(fā)送成功 - 主題: {}, 分區(qū): {}, 偏移量: {}, 鍵: {}, 值: {}", producerRecord.topic(), recordMetadata.partition(), recordMetadata.offset(), producerRecord.key(), producerRecord.value()); } else { log.warn("Kafka消息發(fā)送成功,但RecordMetadata為null - 鍵: {}, 值: {}", producerRecord.key(), producerRecord.value()); } } @Override public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) { // 記錄發(fā)送失敗的消息信息及異常 if (recordMetadata != null) { log.error("Kafka消息發(fā)送失敗 - 主題: {}, 分區(qū): {}, 偏移量: {}, 鍵: {}, 值: {}, 異常: {}", producerRecord.topic(), recordMetadata.partition(), recordMetadata.offset(), producerRecord.key(), producerRecord.value(), exception.getMessage(), exception); } else { log.error("Kafka消息發(fā)送失敗 - RecordMetadata為null, 鍵: {}, 值: {}, 異常: {}", producerRecord.key(), producerRecord.value(), exception.getMessage(), exception); } } }
編寫kafka
消費(fèi)異常處理類
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Slf4j @Component("kafkaConsumerExceptionHandler") public class KafkaConsumerExceptionHandler implements KafkaListenerErrorHandler { /** * 處理錯(cuò)誤,不帶 Consumer 對(duì)象 */ @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e) { log.error("kafka消費(fèi)消息時(shí)發(fā)生錯(cuò)誤。消息內(nèi)容: {}, 錯(cuò)誤信息: {}", message, e.getMessage(), e); // 可以根據(jù)需要選擇是否拋出異常 // 例如:return null; 表示忽略錯(cuò)誤 // throw e; // 拋出異常以觸發(fā)重試機(jī)制 return null; } /** * 處理錯(cuò)誤,帶 Consumer 對(duì)象 */ @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { log.error("kafka消費(fèi)消息時(shí)發(fā)生錯(cuò)誤。消息內(nèi)容: {}, 錯(cuò)誤信息: {}", message, exception.getMessage(), exception); // 可以根據(jù)需要選擇處理方式,例如手動(dòng)提交偏移量,或其他操作 // 這里僅記錄日志并返回 null return null; } }
生產(chǎn)者示例
import com.liboshuai.starlink.slr.engine.api.dto.KafkaEventDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; @Slf4j @Component public class KafkaEventProvider { @Resource private KafkaTemplate<String, Object> kafkaTemplate; @Value("${slr-connector.kafka.provider_topic}") private String providerTopic; /** * 批量上送事件信息到kafka */ public void batchSend(List<KafkaEventDTO> kafkaEventDTOList) { if (CollectionUtils.isEmpty(kafkaEventDTOList)) { return; } kafkaEventDTOList.forEach(eventUploadDTO -> kafkaTemplate.send(providerTopic, eventUploadDTO)); } }
消費(fèi)者實(shí)例
package com.liboshuai.starlink.slr.connector.dao.kafka.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component public class KafkaEventListener { @KafkaListener( topics = "${slr-connector.kafka.consumer_topic}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "kafkaListenerContainerFactory", errorHandler = "kafkaConsumerExceptionHandler" ) public void onAlert(List<ConsumerRecord<String, String>> consumerRecordList, Acknowledgment ack) { for (ConsumerRecord<String, String> record : consumerRecordList) { // 打印消費(fèi)的詳細(xì)信息 log.info("Consumed message - Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // 手動(dòng)提交偏移量 ack.acknowledge(); } }
結(jié)語(yǔ)
對(duì)于生產(chǎn)者、消費(fèi)者提供的例子較少,這方面網(wǎng)上倒是很多,我就不多寫了。主要是重要的配置部分解決即可。
以上就是SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Kafka生產(chǎn)消費(fèi)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法
- SpringBoot集成Kafka的實(shí)現(xiàn)示例
- SpringBoot 整合 Avro 與 Kafka的詳細(xì)過程
- springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認(rèn)證
- SpringBoot使用Kafka來優(yōu)化接口請(qǐng)求的并發(fā)方式
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶數(shù)據(jù)變更后發(fā)送消息
- Spring Boot 集成 Kafka的詳細(xì)步驟
- SpringKafka錯(cuò)誤處理(重試機(jī)制與死信隊(duì)列)
相關(guān)文章
SpringBoot+Ajax+redis實(shí)現(xiàn)隱藏重要接口地址的方法
這篇文章主要介紹了SpringBoot+Ajax+redis實(shí)現(xiàn)隱藏重要接口地址,本篇文章主要講訴使用SpringBoot項(xiàng)目配合Ajax和redis實(shí)現(xiàn)隱藏重要接口地址,這里我以隱藏秒殺地址為例,需要的朋友可以參考下2024-03-03Java 代碼實(shí)例解析設(shè)計(jì)模式之監(jiān)聽者模式
所謂監(jiān)聽者模式,我理解的是構(gòu)建一個(gè)容器存放所有被監(jiān)聽的線程或?qū)ο螅O(jiān)聽每個(gè)線程或?qū)ο蟀l(fā)生的變化,若某個(gè)線程或?qū)ο笥|發(fā)指定規(guī)則,那么則對(duì)所有被監(jiān)聽的線程或?qū)ο蟾鶕?jù)業(yè)務(wù)需要做處理2021-10-10idea 2023.1字體設(shè)置及自動(dòng)調(diào)整大小的圖文教程
這篇文章主要介紹了idea 2023.1字體設(shè)置及自動(dòng)調(diào)整大小的教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07解決springmvc關(guān)于前臺(tái)日期作為實(shí)體類對(duì)象參數(shù)類型轉(zhuǎn)換錯(cuò)誤的問題
下面小編就為大家?guī)硪黄鉀Qspringmvc關(guān)于前臺(tái)日期作為實(shí)體類對(duì)象參數(shù)類型轉(zhuǎn)換錯(cuò)誤的問題。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-06-06Java基于正則表達(dá)式獲取指定HTML標(biāo)簽指定屬性值的方法
這篇文章主要介紹了Java基于正則表達(dá)式獲取指定HTML標(biāo)簽指定屬性值的方法,涉及java基于正則的HTML元素匹配相關(guān)操作技巧,需要的朋友可以參考下2017-01-01