Springboot集成Kafka進(jìn)行批量消費及踩坑點
引入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.11.RELEASE</version>
</dependency>
因為我的項目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。讀者可以根據(jù)下圖來自行選擇對應(yīng)的版本。圖片更新可能不及時,詳情可查看spring-kafka 官方網(wǎng)站。

注:這里有個踩坑點,如果引入包版本不對,項目啟動時會拋出org.springframework.core.log.LogAccessor 異常:
java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor
創(chuàng)建配置類
/**
* kafka 配置類
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@Value("${kafka.group.id}")
private String kafkaGroupId;
@Value("${kafka.topic}")
private String kafkaTopic;
public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf";
public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks";
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 設(shè)置并發(fā)量,小于或者等于 Topic 的分區(qū)數(shù)
factory.setConcurrency(5);
// 設(shè)置為批量監(jiān)聽
factory.setBatchListener(Boolean.TRUE);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
//設(shè)置接入點,請通過控制臺獲取對應(yīng)Topic的接入點。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
//設(shè)置SSL根證書的路徑,請記得將XXX修改為自己的路徑。
//與SASL路徑類似,該文件也不能被打包到j(luò)ar中。
System.setProperty("java.security.auth.login.config", CONFIG_PATH);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH);
//根證書存儲的密碼,保持不變。
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
//接入?yún)f(xié)議,目前支持使用SASL_SSL協(xié)議接入。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//SASL鑒權(quán)方式,保持不變。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// 自動提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
//兩次Poll之間的最大允許間隔。
//消費者超過該值沒有返回心跳,服務(wù)端判斷消費者處于非存活狀態(tài),服務(wù)端將消費者從Consumer Group移除并觸發(fā)Rebalance,默認(rèn)30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//設(shè)置單次拉取的量,走公網(wǎng)訪問時,該參數(shù)會有較大影響。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//每次Poll的最大數(shù)量。
//注意該值不要改得太大,如果Poll太多數(shù)據(jù),而不能在下次Poll之前消費完,則會觸發(fā)一次負(fù)載均衡,產(chǎn)生卡頓。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//當(dāng)前消費實例所屬的消費組,請在控制臺申請之后填寫。
//屬于同一個組的消費實例,會負(fù)載消費消息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
//Hostname校驗改成空。
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
return props;
}
}
注:此處通過 factory.setConcurrency(5); 配置了并發(fā)量為 5 ,假設(shè)我們線上的 Topic 有 12 個分區(qū)。那么將會是 3 個線程分配到 2 個分區(qū),2 個線程分配到 3 個分區(qū),3 * 2 + 2 * 3 = 12。
Kafka 消費者
/**
* kafka 消息消費類
*/
@Component
public class KafkaMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
@KafkaListener(topics = {"${kafka.topic}"})
public void listen(List<ConsumerRecord<String, String>> recordList) {
for (ConsumerRecord<String,String> record : recordList) {
// 打印消息的分區(qū)以及偏移量
LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
String value = record.value();
System.out.println("value = " + value);
// 處理業(yè)務(wù)邏輯 ...
}
}
}
因為我在配置類中設(shè)置了批量監(jiān)聽,所以此處 listen 方法的入?yún)⑹荓ist:List<ConsumerRecord<String, String>>。
到此這篇關(guān)于Springboot集成Kafka進(jìn)行批量消費及踩坑點的文章就介紹到這了,更多相關(guān)Springboot Kafka批量消費內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java利用Reflect實現(xiàn)封裝Excel導(dǎo)出工具類
這篇文章主要為大家詳細(xì)介紹了Java如何利用Reflect實現(xiàn)封裝Excel導(dǎo)出工具類,文中的實現(xiàn)方法講解詳細(xì),具有一定的借鑒價值,需要的可以參考一下2022-11-11
Java中double數(shù)值保留兩位小數(shù)的4種實現(xiàn)方式舉例
在Java編程中,我們經(jīng)常遇到需要對double類型的浮點數(shù)進(jìn)行精確截斷或四舍五入保留兩位小數(shù)的需求,這篇文章主要給大家介紹了關(guān)于Java中double數(shù)值保留兩位小數(shù)的4種實現(xiàn)方式,需要的朋友可以參考下2024-07-07
Java 數(shù)據(jù)結(jié)構(gòu)與算法系列精講之KMP算法
在很多地方也都經(jīng)??吹街v解KMP算法的文章,看久了好像也知道是怎么一回事,但總感覺有些地方自己還是沒有完全懂明白。這兩天花了點時間總結(jié)一下,有點小體會,我希望可以通過我自己的語言來把這個算法的一些細(xì)節(jié)梳理清楚,也算是考驗一下自己有真正理解這個算法2022-02-02
使用spring.profiles.active來分區(qū)配置的方法示例
這篇文章主要介紹了使用spring.profiles.active來分區(qū)配置的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-01-01

