欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文詳解SpringBoot使用Kafka如何保證消息不丟失

 更新時(shí)間:2025年01月26日 11:31:15   作者:小信丶  
這篇文章主要為大家詳細(xì)介紹了SpringBoot使用Kafka如何保證消息不丟失的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考一下

概述

在 Spring Boot 中使用 Kafka 時(shí),要確保消息不丟失,主要涉及到生產(chǎn)者(Producer)、消費(fèi)者(Consumer)以及 Kafka Broker 的配置和設(shè)計(jì)。

1. Spring Boot 與 Kafka 配置

Spring Boot 中使用 Kafka 時(shí),可以通過 spring-kafka 來簡化配置和操作。以下是如何保證消息不丟

1.1 Producer 配置

Kafka 生產(chǎn)者是消息的發(fā)送方,確保消息的可靠性和不丟失需要配置

配置 Kafka 生產(chǎn)

在 application.yml 或application.properties 文件中

spring:
  kafka:
    producer:
      acks: all                  # 消息確認(rèn)策略:all表示等待所有副本確認(rèn)
      retries: 3                  # 發(fā)送失敗時(shí)的重試次數(shù)
      batch-size: 16384           # 每批發(fā)送的消息大小
      linger-ms: 1                # 消息發(fā)送的延遲時(shí)間(單位:毫秒)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer   # Key序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer  # Value序列化器

重要配置

  • acks=all:生產(chǎn)leader 副本宕機(jī)或數(shù)據(jù)丟失的
  • retries=3:
  • linger-ms=1:生產(chǎn)者在發(fā)送消息時(shí)會(huì)有
  • batch-size=16384:設(shè)定

1.2 Kafka Producer 配置實(shí)例

通過 KafkaTemplate 發(fā)送消息時(shí),可以通過 Java 配置來確保消息的可靠性。以下是如何

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaProducerService {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    private static final String TOPIC = "test_topic";
 
    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);  // 發(fā)送消息
    }
}

KafkaTemplate.send() 會(huì)使用 `acks=acks=all 確保消息寫入 Kafka 集群時(shí)的可靠性

2. 消費(fèi)者配置

消費(fèi)者是 Kafka 的消息接收方。在消費(fèi)過程中,確保消息的可靠性和不丟失需要使用合適的 acknowledgment(確認(rèn)機(jī)制)來保證消息的消費(fèi)狀態(tài)。

2.1 Consumer 配置

Kafka 消費(fèi)者的配置在 Spring Boot 中也可以通過 application.yml 或 application.properties 來進(jìn)行配置,確保消費(fèi)者能夠在接收到消息后正確ack)消息。

spring:
  kafka:
    consumer:
      group-id: test-group        # 消費(fèi)者所在的消費(fèi)者組
      enable-auto-commit: false   # 自動(dòng)提交消費(fèi)位移,設(shè)置為false使用手動(dòng)提交
      auto-offset-reset: earliest # 如果沒有偏移量(offset),從最早的位置開始消費(fèi)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # Key反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # Value反序列化器

重要配置項(xiàng)解釋:

enable-auto-commit=false:禁用自動(dòng)提交偏移量,手動(dòng)提交偏移量可以防止消息丟失。自動(dòng)提交可能會(huì)導(dǎo)

auto-offset-reset=earliest:如果消費(fèi)者沒有消費(fèi)過的偏earliest(最早的消息)開始消費(fèi),

group-id=test-group:消費(fèi)者的組 ID,每個(gè)組

2.2 Consumer 手動(dòng)提交偏移量

為了避免消息消費(fèi)丟失,建議手動(dòng)提交消息的偏移量。在 Spring Kafka 中,使用 Acknowledgment 來手動(dòng)確

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
@Service
@EnableKafka
public class KafkaConsumerService {
 
    @KafkaListener(topics = "test_topic", groupId = "test-group")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        // 消費(fèi)消息后手動(dòng)提交偏移量
        System.out.println("Received message: " + record.value());
        
        // 手動(dòng)提交偏移量
        acknowledgment.acknowledge();
    }
}

acknowledgment.acknowledge():手動(dòng)確認(rèn)

3. Kafka Broker 配置

Kafka Broker 配置是確保消息不丟失的關(guān)鍵部分。Kafka Broker 是 Kafka 集群的核心,它負(fù)責(zé)接收、存儲(chǔ)、處理和分發(fā)消息。以下是關(guān)于 Kafka Broker 配置的幾個(gè)重要方面:

3.1 副本數(shù)(Replication)與分區(qū)數(shù)(Partition)

Kafka 使用 副本(Replication) 來保證數(shù)據(jù)的高可用性和容錯(cuò)性。每個(gè)主題(Topic)通常有多個(gè) 分區(qū)(Partition),每個(gè)分區(qū)都有一個(gè) 領(lǐng)導(dǎo)副本(Leader) 和多個(gè) 跟隨副本(Follower)。如果某個(gè) Broker 或分區(qū)失敗,副本機(jī)制確保數(shù)據(jù)不會(huì)丟失,系統(tǒng)仍能正常運(yùn)行。

分區(qū)數(shù):每個(gè)主題通常會(huì)有多個(gè)分區(qū),分區(qū)數(shù)的設(shè)置直接影響消息的吞吐量和并行度。更多的分區(qū)可以提高消息的處理速度,但也會(huì)增加系統(tǒng)的管理開銷。

副本數(shù):每個(gè)分區(qū)應(yīng)有多個(gè)副本,確保數(shù)據(jù)的冗余存儲(chǔ)。副本數(shù)設(shè)置得越高,數(shù)據(jù)丟失的可能性越小,但會(huì)占用更多的存儲(chǔ)資源。

關(guān)鍵配置:

# 每個(gè)主題的分區(qū)數(shù)(默認(rèn)3分區(qū))
num.partitions=3
 
# 默認(rèn)副本因子
default.replication.factor=3
 
# 最小同步副本數(shù)
min.insync.replicas=2

num.partitions:設(shè)置默認(rèn)的分區(qū)數(shù)(可以在創(chuàng)建主題時(shí)指定)。

default.replication.factor:設(shè)置每個(gè)主題的默認(rèn)副本數(shù),建議至少設(shè)置為 3。

min.insync.replicas:設(shè)置最小同步副本數(shù),確保消息只有在至少有該數(shù)量的副本成功寫入后才算成功。通常設(shè)置為 2 或更多,以保證消息的可靠性。

注意:min.insync.replicas 是為了避免某些副本沒有同步就確認(rèn)消息,這可能導(dǎo)致數(shù)據(jù)丟失。

3.2 消息持久化與日志配置

Kafka 的持久化策略和日志配置確保消息在磁盤上長期存儲(chǔ)。Kafka 將每個(gè)分區(qū)的消息存儲(chǔ)在磁盤上的日志文件中。為了避免消息丟失和提高讀寫性能,需要合理配置日志存儲(chǔ)相關(guān)參數(shù)。

關(guān)鍵配置:

# 消息日志的清理策略:可以選擇基于時(shí)間清理或基于大小清理
log.retention.hours=168      # 消息保留 7 天
log.retention.bytes=10737418240  # 設(shè)置日志最大存儲(chǔ)大小為 10 GB
 
# 消息寫入磁盤的刷新頻率
log.flush.interval.messages=10000   # 每 10000 條消息后刷新一次磁盤
log.flush.interval.ms=1000           # 每秒刷新一次磁盤
 
# 啟用壓縮
log.cleanup.policy=compact   # 啟用日志壓縮,保留最新版本的數(shù)據(jù)
 
# 消息寫入到磁盤的時(shí)延
log.segment.bytes=1073741824  # 每個(gè)日志段的大小為 1 GB

log.retention.hours:設(shè)置消息保留的時(shí)間(單位:小時(shí)),默認(rèn) 168 小時(shí)(即 7 天)。過期的消息會(huì)被刪除。

log.retention.bytes:設(shè)置日志保留的最大字節(jié)數(shù),超出大小的日志會(huì)被清理。

log.flush.interval.messages 和 log.flush.interval.ms:這些配置控制 Kafka 將消息刷新到磁盤的頻率,可以根據(jù)實(shí)際需要優(yōu)化,減少 I/O 操作。

log.cleanup.policy:設(shè)置日志清理策略??梢赃x擇 delete(基于時(shí)間或大小清理)或 compact(日志壓縮,僅保留最新版本的消息)。

log.segment.bytes:設(shè)置每個(gè)日志段的大小,當(dāng)日志達(dá)到該大小時(shí)會(huì)創(chuàng)建新的日志段。

3.3 分區(qū)副本同步與數(shù)據(jù)一致性

Kafka 的分區(qū)副本同步策略決定了數(shù)據(jù)的寫入和同步方式。為了確保消息的可靠性,需要配置副本同步機(jī)制,使數(shù)據(jù)被可靠地寫入到多個(gè)副本。

acks=all:確保生產(chǎn)者消息寫入所有副本后才確認(rèn)消息寫入成功??梢耘渲迷谏a(chǎn)者端。

min.insync.replicas:設(shè)置最小同步副本數(shù),保證數(shù)據(jù)在多個(gè)副本同步后再提交。

關(guān)鍵配置:

# 設(shè)置同步副本數(shù),確保寫入成功后至少需要此數(shù)目的副本同步
acks=all   # 等待所有副本確認(rèn)寫入
 
# 設(shè)置最小同步副本數(shù)
min.insync.replicas=2  # 確保至少有 2 個(gè)副本同步

acks=all:確保消息在所有副本成功寫入后才返回確認(rèn)。即使部分副本未同步,也不會(huì)確認(rèn)寫入。

min.insync.replicas:確保消息寫入時(shí),至少有 min.insync.replicas 個(gè)副本同步,以避免因?yàn)閱蝹€(gè)副本失效導(dǎo)致的數(shù)據(jù)丟失。

3.4 Kafka Broker 高可用配置

在 Kafka 集群中,為了防止單點(diǎn)故障,Kafka 提供了高可用性配置。配置多個(gè) Kafka Broker 和分布式部署是保證 Kafka 高可用性的基礎(chǔ)。

關(guān)鍵配置:

# Kafka Broker 啟動(dòng)時(shí)的監(jiān)聽地址和端口
listeners=PLAINTEXT://localhost:9092
 
# 監(jiān)聽的內(nèi)網(wǎng)地址,通常與外網(wǎng)地址分開配置
advertised.listeners=PLAINTEXT://broker1.example.com:9092
 
# Zookeeper 配置
zookeeper.connect=localhost:2181  # 設(shè)置 Zookeeper 地址,確保 Kafka 集群管理的協(xié)調(diào)和一致性

listeners:配置 Kafka Broker 啟動(dòng)時(shí)監(jiān)聽的地址和端口。

advertised.listeners:配置 Kafka Broker 向外暴露的地址和端口,消費(fèi)者和生產(chǎn)者會(huì)連接到這個(gè)地址。

zookeeper.connect:設(shè)置 Zookeeper 地址,Kafka 依賴 Zookeeper 來管理集群的元數(shù)據(jù)和協(xié)調(diào)。

3.5 日志段與磁盤空間管理

Kafka 使用日志段(Log Segment)來管理消息存儲(chǔ),每個(gè)分區(qū)的消息都會(huì)分布在多個(gè)日志段中,磁盤空間需要合理管理以避免磁盤溢出。

關(guān)鍵配置:

# 設(shè)置日志段的大?。▎挝唬鹤止?jié))
log.segment.bytes=1073741824  # 每個(gè)日志段的大小為 1 GB
 
# 每個(gè)日志段的最大時(shí)間
log.roll.ms=604800000  # 設(shè)置為 7 天

log.segment.bytes:每個(gè)日志段的大小,達(dá)到該大小時(shí)會(huì)創(chuàng)建新的日志段。設(shè)置過大可能影響磁盤操作的效率,設(shè)置過小可能增加管理開銷。

log.roll.ms:設(shè)置日志段的滾動(dòng)周期,控制日志文件分割的時(shí)間粒度。

3.6 性能優(yōu)化與磁盤 I/O 配置

Kafka 作為一個(gè)高吞吐量的分布式消息系統(tǒng),對(duì)于磁盤 I/O 和網(wǎng)絡(luò) I/O 的要求很高。合理配置 Kafka Broker 的磁盤 I/O 能提高消息的寫入和讀取速度,避免系統(tǒng)性能瓶頸。

關(guān)鍵配置:

# 設(shè)置消息的最小寫入延遲
log.min.cleanable.dirty.ratio=0.5  # 控制消息清理的閾值
 
# 設(shè)置最大允許的線程數(shù)
num.io.threads=8     # I/O 線程數(shù)量
 
# 設(shè)置 Kafka 內(nèi)存緩存大?。▎挝唬篗B)
log.buffer.size=10485760  # 默認(rèn)為 10MB

log.min.cleanable.dirty.ratio:設(shè)置日志清理的閾值,避免過多無效日志占用空間。

num.io.threads:配置 I/O 線程數(shù)。根據(jù)機(jī)器性能,適當(dāng)增加線程數(shù)以提高并發(fā)處理能力。

log.buffer.size:Kafka 寫入消息時(shí)會(huì)使用緩沖區(qū),適當(dāng)增加緩沖區(qū)大小可以提高消息的寫入性能。

3.7 Kafka 集群的監(jiān)控與故障診斷

為了確保 Kafka Broker 的高可用性和穩(wěn)定性,需要對(duì) Kafka 集群進(jìn)行實(shí)時(shí)監(jiān)控。Kafka 提供了豐富的監(jiān)控指標(biāo),可以通過 JMX 進(jìn)行監(jiān)控,也可以結(jié)合其他工具(如 Prometheus 和 Grafana)來實(shí)現(xiàn)集群健康檢查和報(bào)警。

JMX 監(jiān)控:Kafka 提供了大量的 JMX 指標(biāo),可以用于監(jiān)控消息的吞吐量、延遲、磁盤使用情況等。

日志監(jiān)控:Kafka 的日志記錄了大量的運(yùn)行時(shí)信息,定期檢查日志有助于提前發(fā)現(xiàn)潛在的問題。

4. 總結(jié)

通過合理配置和設(shè)計(jì) Kafka 消息生產(chǎn)和消費(fèi)的架構(gòu),可以有效地確保消息不丟失,并提高系統(tǒng)的可靠性和容錯(cuò)性。以下是關(guān)鍵的策略總結(jié):

生產(chǎn)者配置:

  • 使用 acks=all 確保消息寫入到所有副本。
  • 啟用消息冪等性和重試機(jī)制。
  • 使用合理的 retries 和 delivery.timeout.ms 設(shè)置。

消費(fèi)者配置:

  • 使用手動(dòng)提交消費(fèi)位移,避免消息丟失。
  • 使用去重策略和冪等性消費(fèi),確保消息不會(huì)重復(fù)消費(fèi)。

Broker 配置:

  • 配置合理的副本數(shù)和分區(qū)數(shù),確保高可用性。
  • 使用 Zookeeper 確保 Kafka 集群的協(xié)調(diào)和管理。
  • 設(shè)置合理的日志清理策略,避免存儲(chǔ)空間耗盡。

監(jiān)控與報(bào)警:

  • 使用 JMX 監(jiān)控 Kafka 的運(yùn)行狀態(tài)。
  • 結(jié)合 Spring Boot Actuator 進(jìn)行健康檢查和性能監(jiān)控。

到此這篇關(guān)于一文詳解SpringBoot使用Kafka如何保證消息不丟失的文章就介紹到這了,更多相關(guān)SpringBoot Kafka保證消息不丟失內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論