SpringBoot整合Kafka完成生產(chǎn)消費的方案
前言
網(wǎng)上找了很多管理kafka整合springboot的教程,但是很多都沒辦法應(yīng)用到生產(chǎn)環(huán)境。很多配置都是缺少,或者不正確的,只能當(dāng)個demo。遂自行查找資料、驗證得到了一套可以完美應(yīng)對生產(chǎn)環(huán)境的kafka整合springboot的方案。
基礎(chǔ)配置
引入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
編寫kafka相關(guān)配置
spring:
kafka:
producer:
bootstrap-servers: localhost:9096,localhost:9097,localhost:9098
# transaction-id-prefix: starlinkRisk-
retries: 3
acks: 1
batch-size: 32768
buffer-memory: 33554432
consumer:
bootstrap-servers: localhost:9096,localhost:9097,localhost:9098
group-id: slr_connector
auto-commit-interval: 2000
auto-offset-reset: latest
enable-auto-commit: false
max-poll-records: 3
properties:
max:
poll:
interval:
ms: 600000
session:
timeout:
ms: 10000
listener:
concurrency: 9
missing-topics-fatal: false
poll-timeout: 600000
編寫kafka生產(chǎn)者配置類
import com.liboshuai.starlink.slr.connector.common.handler.KafkaSendResultHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka生產(chǎn)者配置
*/
@Slf4j
@Configuration
public class KafkaProviderConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
// @Value("${spring.kafka.producer.transaction-id-prefix}")
// private String transactionIdPrefix;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Value("${spring.kafka.producer.retries}")
private String retries;
@Value("${spring.kafka.producer.batch-size}")
private String batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private String bufferMemory;
@Resource
private KafkaSendResultHandler kafkaSendResultHandler;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//acks=0 : 生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)。
//acks=1 : 只要集群的首領(lǐng)節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務(wù)器成功響應(yīng)。
//acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。
//開啟事務(wù)必須設(shè)為all
props.put(ProducerConfig.ACKS_CONFIG, acks);
//發(fā)生錯誤后,消息重發(fā)的次數(shù),開啟事務(wù)必須大于0
props.put(ProducerConfig.RETRIES_CONFIG, retries);
//當(dāng)多個消息發(fā)送到相同分區(qū)時,生產(chǎn)者會將消息打包到一起,以減少請求交互. 而不是一條條發(fā)送
//批次的大小可以通過batch.size 參數(shù)設(shè)置.默認(rèn)是16KB
//較小的批次大小有可能降低吞吐量(批次大小為0則完全禁用批處理)。
//比如說,kafka里的消息5秒鐘Batch才湊滿了16KB,才能發(fā)送出去。那這些消息的延遲就是5秒鐘
//實測batchSize這個參數(shù)沒有用
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//有的時刻消息比較少,過了很久,比如5min也沒有湊夠16KB,這樣延時就很大,所以需要一個參數(shù). 再設(shè)置一個時間,到了這個時間,
//即使數(shù)據(jù)沒達(dá)到16KB,也將這個批次發(fā)送出去
props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
//生產(chǎn)者內(nèi)存緩沖區(qū)的大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//反序列化,和生產(chǎn)者的序列化方式對應(yīng)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
//開啟事務(wù),會導(dǎo)致 LINGER_MS_CONFIG 配置失效
// factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}
// @Bean
// public KafkaTransactionManager<String, Object> kafkaTransactionManager(ProducerFactory<String, Object> producerFactory) {
// return new KafkaTransactionManager<>(producerFactory);
// }
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setProducerListener(kafkaSendResultHandler);
return kafkaTemplate;
}
}
編寫kafka消費者配置類
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka消費者配置
*/
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.properties.session.timeout.ms}")
private String sessionTimeout;
@Value("${spring.kafka.properties.max.poll.interval.ms}")
private String maxPollIntervalTime;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.listener.concurrency}")
private Integer concurrency;
@Value("${spring.kafka.listener.missing-topics-fatal}")
private boolean missingTopicsFatal;
@Value("${spring.kafka.listener.poll-timeout}")
private long pollTimeout;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(16);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//是否自動提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動提交偏移量
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
//自動提交的時間間隔,自動提交開啟時生效
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
//該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
//earliest:當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費分區(qū)的記錄
//latest:當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù)(在消費者啟動之后生成的記錄)
//none:當(dāng)各分區(qū)都存在已提交的offset時,從提交的offset開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//兩次poll之間的最大間隔,默認(rèn)值為5分鐘。如果超過這個間隔會觸發(fā)reBalance
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
//這個參數(shù)定義了poll方法最多可以拉取多少條消息,默認(rèn)值為500。如果在拉取消息的時候新消息不足500條,那有多少返回多少;如果超過500條,每次只返回500。
//這個默認(rèn)值在有些場景下太大,有些場景很難保證能夠在5min內(nèi)處理完500條消息,
//如果消費者無法在5分鐘內(nèi)處理完500條消息的話就會觸發(fā)reBalance,
//然后這批消息會被分配到另一個消費者中,還是會處理不完,這樣這批消息就永遠(yuǎn)也處理不完。
//要避免出現(xiàn)上述問題,提前評估好處理一條消息最長需要多少時間,然后覆蓋默認(rèn)的max.poll.records參數(shù)
//注:需要開啟BatchListener批量監(jiān)聽才會生效,如果不開啟BatchListener則不會出現(xiàn)reBalance情況
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//當(dāng)broker多久沒有收到consumer的心跳請求后就觸發(fā)reBalance,默認(rèn)值是10s
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
// 設(shè)置反序列化器為 ErrorHandlingDeserializer,防止藥丸信息
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
// 配置 ErrorHandlingDeserializer 的委托反序列化器為 StringDeserializer
propsMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
propsMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//在偵聽器容器中運行的線程數(shù),一般設(shè)置為 機(jī)器數(shù)*分區(qū)數(shù)
factory.setConcurrency(concurrency);
//消費監(jiān)聽接口監(jiān)聽的主題不存在時,默認(rèn)會報錯,所以設(shè)置為false忽略錯誤
factory.setMissingTopicsFatal(missingTopicsFatal);
//自動提交關(guān)閉,需要設(shè)置手動消息確認(rèn)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(pollTimeout);
//設(shè)置為批量監(jiān)聽,需要用List接收(相等于listener.type: batch)
factory.setBatchListener(true);
return factory;
}
}
編寫kafka發(fā)送結(jié)果回調(diào)處理類
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component("kafkaSendResultHandler")
public class KafkaSendResultHandler implements ProducerListener<String, Object> {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
// 記錄成功發(fā)送的消息信息
if (recordMetadata != null) {
log.info("Kafka消息發(fā)送成功 - 主題: {}, 分區(qū): {}, 偏移量: {}, 鍵: {}, 值: {}",
producerRecord.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
producerRecord.key(),
producerRecord.value());
} else {
log.warn("Kafka消息發(fā)送成功,但RecordMetadata為null - 鍵: {}, 值: {}",
producerRecord.key(),
producerRecord.value());
}
}
@Override
public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {
// 記錄發(fā)送失敗的消息信息及異常
if (recordMetadata != null) {
log.error("Kafka消息發(fā)送失敗 - 主題: {}, 分區(qū): {}, 偏移量: {}, 鍵: {}, 值: {}, 異常: {}",
producerRecord.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
producerRecord.key(),
producerRecord.value(),
exception.getMessage(), exception);
} else {
log.error("Kafka消息發(fā)送失敗 - RecordMetadata為null, 鍵: {}, 值: {}, 異常: {}",
producerRecord.key(),
producerRecord.value(),
exception.getMessage(), exception);
}
}
}
編寫kafka消費異常處理類
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component("kafkaConsumerExceptionHandler")
public class KafkaConsumerExceptionHandler implements KafkaListenerErrorHandler {
/**
* 處理錯誤,不帶 Consumer 對象
*/
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
log.error("kafka消費消息時發(fā)生錯誤。消息內(nèi)容: {}, 錯誤信息: {}", message, e.getMessage(), e);
// 可以根據(jù)需要選擇是否拋出異常
// 例如:return null; 表示忽略錯誤
// throw e; // 拋出異常以觸發(fā)重試機(jī)制
return null;
}
/**
* 處理錯誤,帶 Consumer 對象
*/
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
log.error("kafka消費消息時發(fā)生錯誤。消息內(nèi)容: {}, 錯誤信息: {}", message, exception.getMessage(), exception);
// 可以根據(jù)需要選擇處理方式,例如手動提交偏移量,或其他操作
// 這里僅記錄日志并返回 null
return null;
}
}
生產(chǎn)者示例
import com.liboshuai.starlink.slr.engine.api.dto.KafkaEventDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@Component
public class KafkaEventProvider {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@Value("${slr-connector.kafka.provider_topic}")
private String providerTopic;
/**
* 批量上送事件信息到kafka
*/
public void batchSend(List<KafkaEventDTO> kafkaEventDTOList) {
if (CollectionUtils.isEmpty(kafkaEventDTOList)) {
return;
}
kafkaEventDTOList.forEach(eventUploadDTO -> kafkaTemplate.send(providerTopic, eventUploadDTO));
}
}
消費者實例
package com.liboshuai.starlink.slr.connector.dao.kafka.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class KafkaEventListener {
@KafkaListener(
topics = "${slr-connector.kafka.consumer_topic}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory",
errorHandler = "kafkaConsumerExceptionHandler"
)
public void onAlert(List<ConsumerRecord<String, String>> consumerRecordList, Acknowledgment ack) {
for (ConsumerRecord<String, String> record : consumerRecordList) {
// 打印消費的詳細(xì)信息
log.info("Consumed message - Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
// 手動提交偏移量
ack.acknowledge();
}
}
結(jié)語
對于生產(chǎn)者、消費者提供的例子較少,這方面網(wǎng)上倒是很多,我就不多寫了。主要是重要的配置部分解決即可。
以上就是SpringBoot整合Kafka完成生產(chǎn)消費的方案的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Kafka生產(chǎn)消費的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
JavaWeb實現(xiàn)學(xué)生信息管理系統(tǒng)(2)
這篇文章主要介紹了JavaWeb實現(xiàn)學(xué)生信息管理系統(tǒng)的第二篇,實現(xiàn)學(xué)生管理系統(tǒng)的查找和添加功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08
Java?CompletableFuture實現(xiàn)多線程異步編排
這篇文章主要為大家介紹了Java?CompletableFuture實現(xiàn)多線程異步編排,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
解析ConcurrentHashMap: 紅黑樹的代理類(TreeBin)
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu),今天給大家普及java面試常見問題---ConcurrentHashMap知識,一起看看吧2021-06-06

