springKafka生產(chǎn)者、消費(fèi)者,數(shù)據(jù)發(fā)送/接收方式
更新時(shí)間:2025年09月19日 08:50:42 作者:小風(fēng)010766
這篇文章主要介紹了springKafka生產(chǎn)者、消費(fèi)者,數(shù)據(jù)發(fā)送/接收方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
配置文件
# 指定kafka server的地址,集群配多個(gè),中間,逗號隔開 spring.kafka.bootstrap-servers= # 配置kafka 授權(quán)認(rèn)證 --開始 spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256 spring.kafka.properties.security.protocol=SASL_PLAINTEXT spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="xxxx" password="xxxxx"; spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="xxxx" password="xxxx"; # 配置kafka 授權(quán)認(rèn)證 --結(jié)束 #重試次數(shù) spring.kafka.producer.retries=3 # 重試間隔時(shí)間 (毫秒) spring.kafka.producer.retry.backoff.ms=10000 #批量發(fā)送的消息數(shù)量 spring.kafka.producer.batch-size=1000 #32MB的批處理緩沖區(qū) spring.kafka.producer.buffer-memory=335544320 #默認(rèn)消費(fèi)者組 spring.kafka.consumer.group-id=isms-group #最早未被消費(fèi)的offset spring.kafka.consumer.auto-offset-reset=earliest #批量一次最大拉取數(shù)據(jù)量 spring.kafka.consumer.max-poll-records=500 #自動(dòng)提交時(shí)間間隔,單位ms spring.kafka.consumer.auto-commit-interval=1000 #批消費(fèi)并發(fā)量,小于或等于Topic的分區(qū)數(shù) spring.kafka.consumer.batch.concurrency=2
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private Integer retries;
@Value("${spring.kafka.producer.retry.backoff.ms}")
private Integer retryBackoff;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.batch.concurrency}")
private Integer batchConcurrency;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
@Value("${spring.kafka.properties.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.producer.properties.sasl.jaas.config}")
private String producerSaslJaasConfig;
@Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
private String consumerSaslJaasConfig;
/**
* 生產(chǎn)者配置信息
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 900000);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 消息冪等開關(guān),事務(wù)開啟必須打開消息冪等,但是冪等可以單獨(dú)使用
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(SaslConfigs.SASL_JAAS_CONFIG, producerSaslJaasConfig);
return props;
}
/**
* 生產(chǎn)者工廠
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 生產(chǎn)者模板
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(kafkaTemplate().getProducerFactory().getConfigurationProperties());
}
/**
* 消費(fèi)者配置信息
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 300000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 設(shè)置自動(dòng)提交改成false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(SaslConfigs.SASL_JAAS_CONFIG, consumerSaslJaasConfig);
return props;
}
/**
* 消費(fèi)者批量工廠
*/
/* @Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//設(shè)置并發(fā)量,小于或等于Topic的分區(qū)數(shù)
factory.setConcurrency(batchConcurrency);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}*/
/**
* 單個(gè)消費(fèi)者
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> singleConsumerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public KafkaConsumer<String,String> customKafkaConsumer(){
return new KafkaConsumer<String, String>(consumerConfigs());
}
}生產(chǎn)者
@Component
public class SendMessageToKafka {
private static final Logger log = LoggerFactory.getLogger(SendMessageToKafka.class);
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
public void sendToKafka(String message,String topicName){
try {
String topicDesc = TopicNameEnum.getDescByTopicName(topicName);
SendResult<String, String> sendResult = kafkaTemplate.send(topicName, null, message).get(60, TimeUnit.SECONDS);
if(sendResult.getRecordMetadata() != null){
log.info("topic對象【{}】發(fā)送kafka成功,:{},message sent,topic:{}, partition={}, offset={}",topicDesc,sendResult.getProducerRecord().topic(), sendResult.getRecordMetadata().partition(),
sendResult.getRecordMetadata().offset());
}else {
log.warn("topic對象【{}】消息發(fā)送kafka失敗,topic:{},進(jìn)行重試。。",topicDesc, topicName);
// 當(dāng)kafka發(fā)送失敗后,進(jìn)行重試3次。
FunctionResponse<Boolean> response = FunctionUtil.retryExecute(() -> {
try {
SendResult<String, String> kafkaResult = kafkaTemplate.send(topicName, null, message).get(60, TimeUnit.SECONDS);
if(kafkaResult.getRecordMetadata() != null){
return new FunctionResponse<>(true);
}else {
return new FunctionResponse<>(false);
}
} catch (Exception e) {
return new FunctionResponse<>(false);
}
}, 3, 2000);
if(!response.getResult()) {
log.error("topic對象【{}】 message sent,topic:{},最終重試發(fā)送失敗:{}", topicDesc,topicName, message);
}
}
} catch (Exception e) {
log.error("topic【{}】寫入kafka失敗,{}",topicName,e);
}
}
}消費(fèi)者
@Component
public class CoreCommandListener {
private static final Logger log = LoggerFactory.getLogger(CoreCommandListener.class);
@KafkaListener(topics = {"topic名稱"},containerFactory="singleConsumerFactory")
private void commandConsumer(ConsumerRecord<Object,String> consumerRecord, Acknowledgment ack){
DcpMessage message = JSONObject.parseObject(consumerRecord.value(),DcpMessage.class);
log.info("offset:{},partition:{},消費(fèi)到指令數(shù)據(jù):{}",consumerRecord.offset(),consumerRecord.partition(),message);
//手動(dòng)提交
ack.acknowledge();
}
}總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java經(jīng)典設(shè)計(jì)模式之適配器模式原理與用法詳解
這篇文章主要介紹了Java經(jīng)典設(shè)計(jì)模式之適配器模式,簡單說明了適配器模式的概念、原理,并結(jié)合實(shí)例形式分析了java適配器模式的用法與相關(guān)注意事項(xiàng),需要的朋友可以參考下2017-08-08
java工具類static靜態(tài)方法讀取yml配置過程
文章介紹了在工具類中獲取YAML配置時(shí)遇到的問題,由于變量是靜態(tài)的,而Spring加載靜態(tài)方法比IOC容器早,導(dǎo)致無法直接使用@Value注解讀取YAML配置,從而讀取結(jié)果為null2024-11-11
Java Web實(shí)現(xiàn)自動(dòng)登陸功能
這篇文章主要為大家詳細(xì)介紹了Java Web實(shí)現(xiàn)自動(dòng)登陸功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
idea下載svn的項(xiàng)目并且運(yùn)行操作
這篇文章主要介紹了idea下載svn的項(xiàng)目并且運(yùn)行操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09
手?jǐn)]一個(gè) spring-boot-starter的全過程
這篇文章主要介紹了手?jǐn)]一個(gè) spring-boot-starter的全過程,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01
java拷貝指定目錄下所有內(nèi)容到minIO代碼實(shí)例
這篇文章主要介紹了java拷貝指定目錄下所有內(nèi)容到minIO代碼實(shí)例,創(chuàng)建桶 直接使用工具類先判斷,再創(chuàng)建即可,創(chuàng)建文件夾,需要注意以"/"結(jié)尾,實(shí)際也是在minIO上創(chuàng)建文件,只是作為目錄的表現(xiàn)形式展示,需要的朋友可以參考下2024-01-01
關(guān)于maven配置項(xiàng)目一直提示程序包不存在以及scope的坑
這篇文章主要介紹了關(guān)于maven配置項(xiàng)目一直提示程序包不存在以及scope的坑,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11
SpringBoot構(gòu)建企業(yè)級RESTful API項(xiàng)目的完整指南
在現(xiàn)代軟件開發(fā)中,RESTful API已成為構(gòu)建分布式系統(tǒng)和微服務(wù)架構(gòu)的標(biāo)準(zhǔn)方式,本指南將帶大家從零開始,使用Spring Boot構(gòu)建一個(gè)完整的企業(yè)級RESTful API項(xiàng)目2025-07-07

