一文詳解SpringBoot使用Kafka如何保證消息不丟失
概述
在 Spring Boot 中使用 Kafka 時,要確保消息不丟失,主要涉及到生產者(Producer)、消費者(Consumer)以及 Kafka Broker 的配置和設計。
1. Spring Boot 與 Kafka 配置
Spring Boot 中使用 Kafka 時,可以通過 spring-kafka 來簡化配置和操作。以下是如何保證消息不丟
1.1 Producer 配置
Kafka 生產者是消息的發(fā)送方,確保消息的可靠性和不丟失需要配置
配置 Kafka 生產
在 application.yml 或application.properties 文件中
spring: kafka: producer: acks: all # 消息確認策略:all表示等待所有副本確認 retries: 3 # 發(fā)送失敗時的重試次數 batch-size: 16384 # 每批發(fā)送的消息大小 linger-ms: 1 # 消息發(fā)送的延遲時間(單位:毫秒) key-serializer: org.apache.kafka.common.serialization.StringSerializer # Key序列化器 value-serializer: org.apache.kafka.common.serialization.StringSerializer # Value序列化器
重要配置
- acks=all:生產leader 副本宕機或數據丟失的
- retries=3:
- linger-ms=1:生產者在發(fā)送消息時會有
- batch-size=16384:設定
1.2 Kafka Producer 配置實例
通過 KafkaTemplate 發(fā)送消息時,可以通過 Java 配置來確保消息的可靠性。以下是如何
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private static final String TOPIC = "test_topic"; public void sendMessage(String message) { kafkaTemplate.send(TOPIC, message); // 發(fā)送消息 } }
KafkaTemplate.send() 會使用 `acks=acks=all 確保消息寫入 Kafka 集群時的可靠性
2. 消費者配置
消費者是 Kafka 的消息接收方。在消費過程中,確保消息的可靠性和不丟失需要使用合適的 acknowledgment(確認機制)來保證消息的消費狀態(tài)。
2.1 Consumer 配置
Kafka 消費者的配置在 Spring Boot 中也可以通過 application.yml 或 application.properties 來進行配置,確保消費者能夠在接收到消息后正確ack)消息。
spring: kafka: consumer: group-id: test-group # 消費者所在的消費者組 enable-auto-commit: false # 自動提交消費位移,設置為false使用手動提交 auto-offset-reset: earliest # 如果沒有偏移量(offset),從最早的位置開始消費 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Key反序列化器 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value反序列化器
重要配置項解釋:
enable-auto-commit=false:禁用自動提交偏移量,手動提交偏移量可以防止消息丟失。自動提交可能會導
auto-offset-reset=earliest:如果消費者沒有消費過的偏earliest(最早的消息)開始消費,
group-id=test-group:消費者的組 ID,每個組
2.2 Consumer 手動提交偏移量
為了避免消息消費丟失,建議手動提交消息的偏移量。在 Spring Kafka 中,使用 Acknowledgment 來手動確
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service @EnableKafka public class KafkaConsumerService { @KafkaListener(topics = "test_topic", groupId = "test-group") public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { // 消費消息后手動提交偏移量 System.out.println("Received message: " + record.value()); // 手動提交偏移量 acknowledgment.acknowledge(); } }
acknowledgment.acknowledge():手動確認
3. Kafka Broker 配置
Kafka Broker 配置是確保消息不丟失的關鍵部分。Kafka Broker 是 Kafka 集群的核心,它負責接收、存儲、處理和分發(fā)消息。以下是關于 Kafka Broker 配置的幾個重要方面:
3.1 副本數(Replication)與分區(qū)數(Partition)
Kafka 使用 副本(Replication) 來保證數據的高可用性和容錯性。每個主題(Topic)通常有多個 分區(qū)(Partition),每個分區(qū)都有一個 領導副本(Leader) 和多個 跟隨副本(Follower)。如果某個 Broker 或分區(qū)失敗,副本機制確保數據不會丟失,系統(tǒng)仍能正常運行。
分區(qū)數:每個主題通常會有多個分區(qū),分區(qū)數的設置直接影響消息的吞吐量和并行度。更多的分區(qū)可以提高消息的處理速度,但也會增加系統(tǒng)的管理開銷。
副本數:每個分區(qū)應有多個副本,確保數據的冗余存儲。副本數設置得越高,數據丟失的可能性越小,但會占用更多的存儲資源。
關鍵配置:
# 每個主題的分區(qū)數(默認3分區(qū)) num.partitions=3 # 默認副本因子 default.replication.factor=3 # 最小同步副本數 min.insync.replicas=2
num.partitions:設置默認的分區(qū)數(可以在創(chuàng)建主題時指定)。
default.replication.factor:設置每個主題的默認副本數,建議至少設置為 3。
min.insync.replicas:設置最小同步副本數,確保消息只有在至少有該數量的副本成功寫入后才算成功。通常設置為 2 或更多,以保證消息的可靠性。
注意:min.insync.replicas 是為了避免某些副本沒有同步就確認消息,這可能導致數據丟失。
3.2 消息持久化與日志配置
Kafka 的持久化策略和日志配置確保消息在磁盤上長期存儲。Kafka 將每個分區(qū)的消息存儲在磁盤上的日志文件中。為了避免消息丟失和提高讀寫性能,需要合理配置日志存儲相關參數。
關鍵配置:
# 消息日志的清理策略:可以選擇基于時間清理或基于大小清理 log.retention.hours=168 # 消息保留 7 天 log.retention.bytes=10737418240 # 設置日志最大存儲大小為 10 GB # 消息寫入磁盤的刷新頻率 log.flush.interval.messages=10000 # 每 10000 條消息后刷新一次磁盤 log.flush.interval.ms=1000 # 每秒刷新一次磁盤 # 啟用壓縮 log.cleanup.policy=compact # 啟用日志壓縮,保留最新版本的數據 # 消息寫入到磁盤的時延 log.segment.bytes=1073741824 # 每個日志段的大小為 1 GB
log.retention.hours:設置消息保留的時間(單位:小時),默認 168 小時(即 7 天)。過期的消息會被刪除。
log.retention.bytes:設置日志保留的最大字節(jié)數,超出大小的日志會被清理。
log.flush.interval.messages 和 log.flush.interval.ms:這些配置控制 Kafka 將消息刷新到磁盤的頻率,可以根據實際需要優(yōu)化,減少 I/O 操作。
log.cleanup.policy:設置日志清理策略??梢赃x擇 delete(基于時間或大小清理)或 compact(日志壓縮,僅保留最新版本的消息)。
log.segment.bytes:設置每個日志段的大小,當日志達到該大小時會創(chuàng)建新的日志段。
3.3 分區(qū)副本同步與數據一致性
Kafka 的分區(qū)副本同步策略決定了數據的寫入和同步方式。為了確保消息的可靠性,需要配置副本同步機制,使數據被可靠地寫入到多個副本。
acks=all:確保生產者消息寫入所有副本后才確認消息寫入成功??梢耘渲迷谏a者端。
min.insync.replicas:設置最小同步副本數,保證數據在多個副本同步后再提交。
關鍵配置:
# 設置同步副本數,確保寫入成功后至少需要此數目的副本同步 acks=all # 等待所有副本確認寫入 # 設置最小同步副本數 min.insync.replicas=2 # 確保至少有 2 個副本同步
acks=all:確保消息在所有副本成功寫入后才返回確認。即使部分副本未同步,也不會確認寫入。
min.insync.replicas:確保消息寫入時,至少有 min.insync.replicas 個副本同步,以避免因為單個副本失效導致的數據丟失。
3.4 Kafka Broker 高可用配置
在 Kafka 集群中,為了防止單點故障,Kafka 提供了高可用性配置。配置多個 Kafka Broker 和分布式部署是保證 Kafka 高可用性的基礎。
關鍵配置:
# Kafka Broker 啟動時的監(jiān)聽地址和端口 listeners=PLAINTEXT://localhost:9092 # 監(jiān)聽的內網地址,通常與外網地址分開配置 advertised.listeners=PLAINTEXT://broker1.example.com:9092 # Zookeeper 配置 zookeeper.connect=localhost:2181 # 設置 Zookeeper 地址,確保 Kafka 集群管理的協(xié)調和一致性
listeners:配置 Kafka Broker 啟動時監(jiān)聽的地址和端口。
advertised.listeners:配置 Kafka Broker 向外暴露的地址和端口,消費者和生產者會連接到這個地址。
zookeeper.connect:設置 Zookeeper 地址,Kafka 依賴 Zookeeper 來管理集群的元數據和協(xié)調。
3.5 日志段與磁盤空間管理
Kafka 使用日志段(Log Segment)來管理消息存儲,每個分區(qū)的消息都會分布在多個日志段中,磁盤空間需要合理管理以避免磁盤溢出。
關鍵配置:
# 設置日志段的大?。▎挝唬鹤止?jié)) log.segment.bytes=1073741824 # 每個日志段的大小為 1 GB # 每個日志段的最大時間 log.roll.ms=604800000 # 設置為 7 天
log.segment.bytes:每個日志段的大小,達到該大小時會創(chuàng)建新的日志段。設置過大可能影響磁盤操作的效率,設置過小可能增加管理開銷。
log.roll.ms:設置日志段的滾動周期,控制日志文件分割的時間粒度。
3.6 性能優(yōu)化與磁盤 I/O 配置
Kafka 作為一個高吞吐量的分布式消息系統(tǒng),對于磁盤 I/O 和網絡 I/O 的要求很高。合理配置 Kafka Broker 的磁盤 I/O 能提高消息的寫入和讀取速度,避免系統(tǒng)性能瓶頸。
關鍵配置:
# 設置消息的最小寫入延遲 log.min.cleanable.dirty.ratio=0.5 # 控制消息清理的閾值 # 設置最大允許的線程數 num.io.threads=8 # I/O 線程數量 # 設置 Kafka 內存緩存大?。▎挝唬篗B) log.buffer.size=10485760 # 默認為 10MB
log.min.cleanable.dirty.ratio:設置日志清理的閾值,避免過多無效日志占用空間。
num.io.threads:配置 I/O 線程數。根據機器性能,適當增加線程數以提高并發(fā)處理能力。
log.buffer.size:Kafka 寫入消息時會使用緩沖區(qū),適當增加緩沖區(qū)大小可以提高消息的寫入性能。
3.7 Kafka 集群的監(jiān)控與故障診斷
為了確保 Kafka Broker 的高可用性和穩(wěn)定性,需要對 Kafka 集群進行實時監(jiān)控。Kafka 提供了豐富的監(jiān)控指標,可以通過 JMX 進行監(jiān)控,也可以結合其他工具(如 Prometheus 和 Grafana)來實現集群健康檢查和報警。
JMX 監(jiān)控:Kafka 提供了大量的 JMX 指標,可以用于監(jiān)控消息的吞吐量、延遲、磁盤使用情況等。
日志監(jiān)控:Kafka 的日志記錄了大量的運行時信息,定期檢查日志有助于提前發(fā)現潛在的問題。
4. 總結
通過合理配置和設計 Kafka 消息生產和消費的架構,可以有效地確保消息不丟失,并提高系統(tǒng)的可靠性和容錯性。以下是關鍵的策略總結:
生產者配置:
- 使用 acks=all 確保消息寫入到所有副本。
- 啟用消息冪等性和重試機制。
- 使用合理的 retries 和 delivery.timeout.ms 設置。
消費者配置:
- 使用手動提交消費位移,避免消息丟失。
- 使用去重策略和冪等性消費,確保消息不會重復消費。
Broker 配置:
- 配置合理的副本數和分區(qū)數,確保高可用性。
- 使用 Zookeeper 確保 Kafka 集群的協(xié)調和管理。
- 設置合理的日志清理策略,避免存儲空間耗盡。
監(jiān)控與報警:
- 使用 JMX 監(jiān)控 Kafka 的運行狀態(tài)。
- 結合 Spring Boot Actuator 進行健康檢查和性能監(jiān)控。
到此這篇關于一文詳解SpringBoot使用Kafka如何保證消息不丟失的文章就介紹到這了,更多相關SpringBoot Kafka保證消息不丟失內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
web.xml中servlet, bean, filter, listenr 加載順序_動力節(jié)點Java學院整理
這篇文章主要介紹了web.xml中servlet, bean, filter, listenr 加載順序,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08MyBatis綁定錯誤提示BindingException:Invalid bound statement (not f
這篇文章主要介紹了MyBatis綁定錯誤提示BindingException:Invalid bound statement (not found)的解決辦法,非常不錯,具有參考借鑒價值,需要的的朋友參考下吧2017-01-01IDEA?2021.3?使用及idea2021.3.1激活使用方法
IDEA?全稱?IntelliJ?IDEA,是java語言開發(fā)的集成環(huán)境,IntelliJ在業(yè)界被公認為最好的java開發(fā)工具之一,今天通過本文給大家介紹idea2021.3.1激活及使用教程,感興趣的朋友一起看看吧2022-01-01