一文詳解SpringBoot使用Kafka如何保證消息不丟失
概述
在 Spring Boot 中使用 Kafka 時(shí),要確保消息不丟失,主要涉及到生產(chǎn)者(Producer)、消費(fèi)者(Consumer)以及 Kafka Broker 的配置和設(shè)計(jì)。
1. Spring Boot 與 Kafka 配置
Spring Boot 中使用 Kafka 時(shí),可以通過 spring-kafka 來簡化配置和操作。以下是如何保證消息不丟
1.1 Producer 配置
Kafka 生產(chǎn)者是消息的發(fā)送方,確保消息的可靠性和不丟失需要配置
配置 Kafka 生產(chǎn)
在 application.yml 或application.properties 文件中
spring: kafka: producer: acks: all # 消息確認(rèn)策略:all表示等待所有副本確認(rèn) retries: 3 # 發(fā)送失敗時(shí)的重試次數(shù) batch-size: 16384 # 每批發(fā)送的消息大小 linger-ms: 1 # 消息發(fā)送的延遲時(shí)間(單位:毫秒) key-serializer: org.apache.kafka.common.serialization.StringSerializer # Key序列化器 value-serializer: org.apache.kafka.common.serialization.StringSerializer # Value序列化器
重要配置
- acks=all:生產(chǎn)leader 副本宕機(jī)或數(shù)據(jù)丟失的
- retries=3:
- linger-ms=1:生產(chǎn)者在發(fā)送消息時(shí)會(huì)有
- batch-size=16384:設(shè)定
1.2 Kafka Producer 配置實(shí)例
通過 KafkaTemplate 發(fā)送消息時(shí),可以通過 Java 配置來確保消息的可靠性。以下是如何
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private static final String TOPIC = "test_topic"; public void sendMessage(String message) { kafkaTemplate.send(TOPIC, message); // 發(fā)送消息 } }
KafkaTemplate.send() 會(huì)使用 `acks=acks=all 確保消息寫入 Kafka 集群時(shí)的可靠性
2. 消費(fèi)者配置
消費(fèi)者是 Kafka 的消息接收方。在消費(fèi)過程中,確保消息的可靠性和不丟失需要使用合適的 acknowledgment(確認(rèn)機(jī)制)來保證消息的消費(fèi)狀態(tài)。
2.1 Consumer 配置
Kafka 消費(fèi)者的配置在 Spring Boot 中也可以通過 application.yml 或 application.properties 來進(jìn)行配置,確保消費(fèi)者能夠在接收到消息后正確ack)消息。
spring: kafka: consumer: group-id: test-group # 消費(fèi)者所在的消費(fèi)者組 enable-auto-commit: false # 自動(dòng)提交消費(fèi)位移,設(shè)置為false使用手動(dòng)提交 auto-offset-reset: earliest # 如果沒有偏移量(offset),從最早的位置開始消費(fèi) key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Key反序列化器 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value反序列化器
重要配置項(xiàng)解釋:
enable-auto-commit=false:禁用自動(dòng)提交偏移量,手動(dòng)提交偏移量可以防止消息丟失。自動(dòng)提交可能會(huì)導(dǎo)
auto-offset-reset=earliest:如果消費(fèi)者沒有消費(fèi)過的偏earliest(最早的消息)開始消費(fèi),
group-id=test-group:消費(fèi)者的組 ID,每個(gè)組
2.2 Consumer 手動(dòng)提交偏移量
為了避免消息消費(fèi)丟失,建議手動(dòng)提交消息的偏移量。在 Spring Kafka 中,使用 Acknowledgment 來手動(dòng)確
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service @EnableKafka public class KafkaConsumerService { @KafkaListener(topics = "test_topic", groupId = "test-group") public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { // 消費(fèi)消息后手動(dòng)提交偏移量 System.out.println("Received message: " + record.value()); // 手動(dòng)提交偏移量 acknowledgment.acknowledge(); } }
acknowledgment.acknowledge():手動(dòng)確認(rèn)
3. Kafka Broker 配置
Kafka Broker 配置是確保消息不丟失的關(guān)鍵部分。Kafka Broker 是 Kafka 集群的核心,它負(fù)責(zé)接收、存儲(chǔ)、處理和分發(fā)消息。以下是關(guān)于 Kafka Broker 配置的幾個(gè)重要方面:
3.1 副本數(shù)(Replication)與分區(qū)數(shù)(Partition)
Kafka 使用 副本(Replication) 來保證數(shù)據(jù)的高可用性和容錯(cuò)性。每個(gè)主題(Topic)通常有多個(gè) 分區(qū)(Partition),每個(gè)分區(qū)都有一個(gè) 領(lǐng)導(dǎo)副本(Leader) 和多個(gè) 跟隨副本(Follower)。如果某個(gè) Broker 或分區(qū)失敗,副本機(jī)制確保數(shù)據(jù)不會(huì)丟失,系統(tǒng)仍能正常運(yùn)行。
分區(qū)數(shù):每個(gè)主題通常會(huì)有多個(gè)分區(qū),分區(qū)數(shù)的設(shè)置直接影響消息的吞吐量和并行度。更多的分區(qū)可以提高消息的處理速度,但也會(huì)增加系統(tǒng)的管理開銷。
副本數(shù):每個(gè)分區(qū)應(yīng)有多個(gè)副本,確保數(shù)據(jù)的冗余存儲(chǔ)。副本數(shù)設(shè)置得越高,數(shù)據(jù)丟失的可能性越小,但會(huì)占用更多的存儲(chǔ)資源。
關(guān)鍵配置:
# 每個(gè)主題的分區(qū)數(shù)(默認(rèn)3分區(qū)) num.partitions=3 # 默認(rèn)副本因子 default.replication.factor=3 # 最小同步副本數(shù) min.insync.replicas=2
num.partitions:設(shè)置默認(rèn)的分區(qū)數(shù)(可以在創(chuàng)建主題時(shí)指定)。
default.replication.factor:設(shè)置每個(gè)主題的默認(rèn)副本數(shù),建議至少設(shè)置為 3。
min.insync.replicas:設(shè)置最小同步副本數(shù),確保消息只有在至少有該數(shù)量的副本成功寫入后才算成功。通常設(shè)置為 2 或更多,以保證消息的可靠性。
注意:min.insync.replicas 是為了避免某些副本沒有同步就確認(rèn)消息,這可能導(dǎo)致數(shù)據(jù)丟失。
3.2 消息持久化與日志配置
Kafka 的持久化策略和日志配置確保消息在磁盤上長期存儲(chǔ)。Kafka 將每個(gè)分區(qū)的消息存儲(chǔ)在磁盤上的日志文件中。為了避免消息丟失和提高讀寫性能,需要合理配置日志存儲(chǔ)相關(guān)參數(shù)。
關(guān)鍵配置:
# 消息日志的清理策略:可以選擇基于時(shí)間清理或基于大小清理 log.retention.hours=168 # 消息保留 7 天 log.retention.bytes=10737418240 # 設(shè)置日志最大存儲(chǔ)大小為 10 GB # 消息寫入磁盤的刷新頻率 log.flush.interval.messages=10000 # 每 10000 條消息后刷新一次磁盤 log.flush.interval.ms=1000 # 每秒刷新一次磁盤 # 啟用壓縮 log.cleanup.policy=compact # 啟用日志壓縮,保留最新版本的數(shù)據(jù) # 消息寫入到磁盤的時(shí)延 log.segment.bytes=1073741824 # 每個(gè)日志段的大小為 1 GB
log.retention.hours:設(shè)置消息保留的時(shí)間(單位:小時(shí)),默認(rèn) 168 小時(shí)(即 7 天)。過期的消息會(huì)被刪除。
log.retention.bytes:設(shè)置日志保留的最大字節(jié)數(shù),超出大小的日志會(huì)被清理。
log.flush.interval.messages 和 log.flush.interval.ms:這些配置控制 Kafka 將消息刷新到磁盤的頻率,可以根據(jù)實(shí)際需要優(yōu)化,減少 I/O 操作。
log.cleanup.policy:設(shè)置日志清理策略??梢赃x擇 delete(基于時(shí)間或大小清理)或 compact(日志壓縮,僅保留最新版本的消息)。
log.segment.bytes:設(shè)置每個(gè)日志段的大小,當(dāng)日志達(dá)到該大小時(shí)會(huì)創(chuàng)建新的日志段。
3.3 分區(qū)副本同步與數(shù)據(jù)一致性
Kafka 的分區(qū)副本同步策略決定了數(shù)據(jù)的寫入和同步方式。為了確保消息的可靠性,需要配置副本同步機(jī)制,使數(shù)據(jù)被可靠地寫入到多個(gè)副本。
acks=all:確保生產(chǎn)者消息寫入所有副本后才確認(rèn)消息寫入成功??梢耘渲迷谏a(chǎn)者端。
min.insync.replicas:設(shè)置最小同步副本數(shù),保證數(shù)據(jù)在多個(gè)副本同步后再提交。
關(guān)鍵配置:
# 設(shè)置同步副本數(shù),確保寫入成功后至少需要此數(shù)目的副本同步 acks=all # 等待所有副本確認(rèn)寫入 # 設(shè)置最小同步副本數(shù) min.insync.replicas=2 # 確保至少有 2 個(gè)副本同步
acks=all:確保消息在所有副本成功寫入后才返回確認(rèn)。即使部分副本未同步,也不會(huì)確認(rèn)寫入。
min.insync.replicas:確保消息寫入時(shí),至少有 min.insync.replicas 個(gè)副本同步,以避免因?yàn)閱蝹€(gè)副本失效導(dǎo)致的數(shù)據(jù)丟失。
3.4 Kafka Broker 高可用配置
在 Kafka 集群中,為了防止單點(diǎn)故障,Kafka 提供了高可用性配置。配置多個(gè) Kafka Broker 和分布式部署是保證 Kafka 高可用性的基礎(chǔ)。
關(guān)鍵配置:
# Kafka Broker 啟動(dòng)時(shí)的監(jiān)聽地址和端口 listeners=PLAINTEXT://localhost:9092 # 監(jiān)聽的內(nèi)網(wǎng)地址,通常與外網(wǎng)地址分開配置 advertised.listeners=PLAINTEXT://broker1.example.com:9092 # Zookeeper 配置 zookeeper.connect=localhost:2181 # 設(shè)置 Zookeeper 地址,確保 Kafka 集群管理的協(xié)調(diào)和一致性
listeners:配置 Kafka Broker 啟動(dòng)時(shí)監(jiān)聽的地址和端口。
advertised.listeners:配置 Kafka Broker 向外暴露的地址和端口,消費(fèi)者和生產(chǎn)者會(huì)連接到這個(gè)地址。
zookeeper.connect:設(shè)置 Zookeeper 地址,Kafka 依賴 Zookeeper 來管理集群的元數(shù)據(jù)和協(xié)調(diào)。
3.5 日志段與磁盤空間管理
Kafka 使用日志段(Log Segment)來管理消息存儲(chǔ),每個(gè)分區(qū)的消息都會(huì)分布在多個(gè)日志段中,磁盤空間需要合理管理以避免磁盤溢出。
關(guān)鍵配置:
# 設(shè)置日志段的大?。▎挝唬鹤止?jié)) log.segment.bytes=1073741824 # 每個(gè)日志段的大小為 1 GB # 每個(gè)日志段的最大時(shí)間 log.roll.ms=604800000 # 設(shè)置為 7 天
log.segment.bytes:每個(gè)日志段的大小,達(dá)到該大小時(shí)會(huì)創(chuàng)建新的日志段。設(shè)置過大可能影響磁盤操作的效率,設(shè)置過小可能增加管理開銷。
log.roll.ms:設(shè)置日志段的滾動(dòng)周期,控制日志文件分割的時(shí)間粒度。
3.6 性能優(yōu)化與磁盤 I/O 配置
Kafka 作為一個(gè)高吞吐量的分布式消息系統(tǒng),對(duì)于磁盤 I/O 和網(wǎng)絡(luò) I/O 的要求很高。合理配置 Kafka Broker 的磁盤 I/O 能提高消息的寫入和讀取速度,避免系統(tǒng)性能瓶頸。
關(guān)鍵配置:
# 設(shè)置消息的最小寫入延遲 log.min.cleanable.dirty.ratio=0.5 # 控制消息清理的閾值 # 設(shè)置最大允許的線程數(shù) num.io.threads=8 # I/O 線程數(shù)量 # 設(shè)置 Kafka 內(nèi)存緩存大?。▎挝唬篗B) log.buffer.size=10485760 # 默認(rèn)為 10MB
log.min.cleanable.dirty.ratio:設(shè)置日志清理的閾值,避免過多無效日志占用空間。
num.io.threads:配置 I/O 線程數(shù)。根據(jù)機(jī)器性能,適當(dāng)增加線程數(shù)以提高并發(fā)處理能力。
log.buffer.size:Kafka 寫入消息時(shí)會(huì)使用緩沖區(qū),適當(dāng)增加緩沖區(qū)大小可以提高消息的寫入性能。
3.7 Kafka 集群的監(jiān)控與故障診斷
為了確保 Kafka Broker 的高可用性和穩(wěn)定性,需要對(duì) Kafka 集群進(jìn)行實(shí)時(shí)監(jiān)控。Kafka 提供了豐富的監(jiān)控指標(biāo),可以通過 JMX 進(jìn)行監(jiān)控,也可以結(jié)合其他工具(如 Prometheus 和 Grafana)來實(shí)現(xiàn)集群健康檢查和報(bào)警。
JMX 監(jiān)控:Kafka 提供了大量的 JMX 指標(biāo),可以用于監(jiān)控消息的吞吐量、延遲、磁盤使用情況等。
日志監(jiān)控:Kafka 的日志記錄了大量的運(yùn)行時(shí)信息,定期檢查日志有助于提前發(fā)現(xiàn)潛在的問題。
4. 總結(jié)
通過合理配置和設(shè)計(jì) Kafka 消息生產(chǎn)和消費(fèi)的架構(gòu),可以有效地確保消息不丟失,并提高系統(tǒng)的可靠性和容錯(cuò)性。以下是關(guān)鍵的策略總結(jié):
生產(chǎn)者配置:
- 使用 acks=all 確保消息寫入到所有副本。
- 啟用消息冪等性和重試機(jī)制。
- 使用合理的 retries 和 delivery.timeout.ms 設(shè)置。
消費(fèi)者配置:
- 使用手動(dòng)提交消費(fèi)位移,避免消息丟失。
- 使用去重策略和冪等性消費(fèi),確保消息不會(huì)重復(fù)消費(fèi)。
Broker 配置:
- 配置合理的副本數(shù)和分區(qū)數(shù),確保高可用性。
- 使用 Zookeeper 確保 Kafka 集群的協(xié)調(diào)和管理。
- 設(shè)置合理的日志清理策略,避免存儲(chǔ)空間耗盡。
監(jiān)控與報(bào)警:
- 使用 JMX 監(jiān)控 Kafka 的運(yùn)行狀態(tài)。
- 結(jié)合 Spring Boot Actuator 進(jìn)行健康檢查和性能監(jiān)控。
到此這篇關(guān)于一文詳解SpringBoot使用Kafka如何保證消息不丟失的文章就介紹到這了,更多相關(guān)SpringBoot Kafka保證消息不丟失內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
一文帶你深入了解Java的數(shù)據(jù)結(jié)構(gòu)
Java工具包提供了強(qiáng)大的數(shù)據(jù)結(jié)構(gòu)。這篇文章主要為大家詳細(xì)介紹了Java數(shù)據(jù)結(jié)構(gòu)中常用的幾種接口和類,感興趣的小伙伴可以跟隨小編一起了解一下2023-05-05springboot+jsonp解決前端跨域問題小結(jié)
這篇文章主要介紹了springboot+jsonp解決前端跨域問題小結(jié),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-06-06使用jmeter實(shí)現(xiàn)對(duì)jar包的調(diào)用方式
這篇文章主要介紹了使用jmeter實(shí)現(xiàn)對(duì)jar包的調(diào)用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-03-03web.xml中servlet, bean, filter, listenr 加載順序_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了web.xml中servlet, bean, filter, listenr 加載順序,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08MyBatis綁定錯(cuò)誤提示BindingException:Invalid bound statement (not f
這篇文章主要介紹了MyBatis綁定錯(cuò)誤提示BindingException:Invalid bound statement (not found)的解決辦法,非常不錯(cuò),具有參考借鑒價(jià)值,需要的的朋友參考下吧2017-01-01Java實(shí)現(xiàn)文件或文件夾的復(fù)制到指定目錄實(shí)例
本篇文章主要介紹了Java實(shí)現(xiàn)文件或文件夾的復(fù)制到指定目錄實(shí)例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-03-03SpringBoot實(shí)現(xiàn)RAS+AES自動(dòng)接口解密
本文主要介紹了SpringBoot實(shí)現(xiàn)RAS+AES自動(dòng)接口解密,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03IDEA?2021.3?使用及idea2021.3.1激活使用方法
IDEA?全稱?IntelliJ?IDEA,是java語言開發(fā)的集成環(huán)境,IntelliJ在業(yè)界被公認(rèn)為最好的java開發(fā)工具之一,今天通過本文給大家介紹idea2021.3.1激活及使用教程,感興趣的朋友一起看看吧2022-01-01