欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文詳解SpringBoot使用Kafka如何保證消息不丟失

 更新時間:2025年01月26日 11:31:15   作者:小信丶  
這篇文章主要為大家詳細介紹了SpringBoot使用Kafka如何保證消息不丟失的相關知識,文中的示例代碼講解詳細,有需要的小伙伴可以參考一下

概述

在 Spring Boot 中使用 Kafka 時,要確保消息不丟失,主要涉及到生產者(Producer)、消費者(Consumer)以及 Kafka Broker 的配置和設計。

1. Spring Boot 與 Kafka 配置

Spring Boot 中使用 Kafka 時,可以通過 spring-kafka 來簡化配置和操作。以下是如何保證消息不丟

1.1 Producer 配置

Kafka 生產者是消息的發(fā)送方,確保消息的可靠性和不丟失需要配置

配置 Kafka 生產

在 application.yml 或application.properties 文件中

spring:
  kafka:
    producer:
      acks: all                  # 消息確認策略:all表示等待所有副本確認
      retries: 3                  # 發(fā)送失敗時的重試次數
      batch-size: 16384           # 每批發(fā)送的消息大小
      linger-ms: 1                # 消息發(fā)送的延遲時間(單位:毫秒)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer   # Key序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer  # Value序列化器

重要配置

  • acks=all:生產leader 副本宕機或數據丟失的
  • retries=3:
  • linger-ms=1:生產者在發(fā)送消息時會有
  • batch-size=16384:設定

1.2 Kafka Producer 配置實例

通過 KafkaTemplate 發(fā)送消息時,可以通過 Java 配置來確保消息的可靠性。以下是如何

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaProducerService {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    private static final String TOPIC = "test_topic";
 
    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);  // 發(fā)送消息
    }
}

KafkaTemplate.send() 會使用 `acks=acks=all 確保消息寫入 Kafka 集群時的可靠性

2. 消費者配置

消費者是 Kafka 的消息接收方。在消費過程中,確保消息的可靠性和不丟失需要使用合適的 acknowledgment(確認機制)來保證消息的消費狀態(tài)。

2.1 Consumer 配置

Kafka 消費者的配置在 Spring Boot 中也可以通過 application.yml 或 application.properties 來進行配置,確保消費者能夠在接收到消息后正確ack)消息。

spring:
  kafka:
    consumer:
      group-id: test-group        # 消費者所在的消費者組
      enable-auto-commit: false   # 自動提交消費位移,設置為false使用手動提交
      auto-offset-reset: earliest # 如果沒有偏移量(offset),從最早的位置開始消費
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # Key反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # Value反序列化器

重要配置項解釋:

enable-auto-commit=false:禁用自動提交偏移量,手動提交偏移量可以防止消息丟失。自動提交可能會導

auto-offset-reset=earliest:如果消費者沒有消費過的偏earliest(最早的消息)開始消費,

group-id=test-group:消費者的組 ID,每個組

2.2 Consumer 手動提交偏移量

為了避免消息消費丟失,建議手動提交消息的偏移量。在 Spring Kafka 中,使用 Acknowledgment 來手動確

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
@Service
@EnableKafka
public class KafkaConsumerService {
 
    @KafkaListener(topics = "test_topic", groupId = "test-group")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        // 消費消息后手動提交偏移量
        System.out.println("Received message: " + record.value());
        
        // 手動提交偏移量
        acknowledgment.acknowledge();
    }
}

acknowledgment.acknowledge():手動確認

3. Kafka Broker 配置

Kafka Broker 配置是確保消息不丟失的關鍵部分。Kafka Broker 是 Kafka 集群的核心,它負責接收、存儲、處理和分發(fā)消息。以下是關于 Kafka Broker 配置的幾個重要方面:

3.1 副本數(Replication)與分區(qū)數(Partition)

Kafka 使用 副本(Replication) 來保證數據的高可用性和容錯性。每個主題(Topic)通常有多個 分區(qū)(Partition),每個分區(qū)都有一個 領導副本(Leader) 和多個 跟隨副本(Follower)。如果某個 Broker 或分區(qū)失敗,副本機制確保數據不會丟失,系統(tǒng)仍能正常運行。

分區(qū)數:每個主題通常會有多個分區(qū),分區(qū)數的設置直接影響消息的吞吐量和并行度。更多的分區(qū)可以提高消息的處理速度,但也會增加系統(tǒng)的管理開銷。

副本數:每個分區(qū)應有多個副本,確保數據的冗余存儲。副本數設置得越高,數據丟失的可能性越小,但會占用更多的存儲資源。

關鍵配置:

# 每個主題的分區(qū)數(默認3分區(qū))
num.partitions=3
 
# 默認副本因子
default.replication.factor=3
 
# 最小同步副本數
min.insync.replicas=2

num.partitions:設置默認的分區(qū)數(可以在創(chuàng)建主題時指定)。

default.replication.factor:設置每個主題的默認副本數,建議至少設置為 3。

min.insync.replicas:設置最小同步副本數,確保消息只有在至少有該數量的副本成功寫入后才算成功。通常設置為 2 或更多,以保證消息的可靠性。

注意:min.insync.replicas 是為了避免某些副本沒有同步就確認消息,這可能導致數據丟失。

3.2 消息持久化與日志配置

Kafka 的持久化策略和日志配置確保消息在磁盤上長期存儲。Kafka 將每個分區(qū)的消息存儲在磁盤上的日志文件中。為了避免消息丟失和提高讀寫性能,需要合理配置日志存儲相關參數。

關鍵配置:

# 消息日志的清理策略:可以選擇基于時間清理或基于大小清理
log.retention.hours=168      # 消息保留 7 天
log.retention.bytes=10737418240  # 設置日志最大存儲大小為 10 GB
 
# 消息寫入磁盤的刷新頻率
log.flush.interval.messages=10000   # 每 10000 條消息后刷新一次磁盤
log.flush.interval.ms=1000           # 每秒刷新一次磁盤
 
# 啟用壓縮
log.cleanup.policy=compact   # 啟用日志壓縮,保留最新版本的數據
 
# 消息寫入到磁盤的時延
log.segment.bytes=1073741824  # 每個日志段的大小為 1 GB

log.retention.hours:設置消息保留的時間(單位:小時),默認 168 小時(即 7 天)。過期的消息會被刪除。

log.retention.bytes:設置日志保留的最大字節(jié)數,超出大小的日志會被清理。

log.flush.interval.messages 和 log.flush.interval.ms:這些配置控制 Kafka 將消息刷新到磁盤的頻率,可以根據實際需要優(yōu)化,減少 I/O 操作。

log.cleanup.policy:設置日志清理策略??梢赃x擇 delete(基于時間或大小清理)或 compact(日志壓縮,僅保留最新版本的消息)。

log.segment.bytes:設置每個日志段的大小,當日志達到該大小時會創(chuàng)建新的日志段。

3.3 分區(qū)副本同步與數據一致性

Kafka 的分區(qū)副本同步策略決定了數據的寫入和同步方式。為了確保消息的可靠性,需要配置副本同步機制,使數據被可靠地寫入到多個副本。

acks=all:確保生產者消息寫入所有副本后才確認消息寫入成功??梢耘渲迷谏a者端。

min.insync.replicas:設置最小同步副本數,保證數據在多個副本同步后再提交。

關鍵配置:

# 設置同步副本數,確保寫入成功后至少需要此數目的副本同步
acks=all   # 等待所有副本確認寫入
 
# 設置最小同步副本數
min.insync.replicas=2  # 確保至少有 2 個副本同步

acks=all:確保消息在所有副本成功寫入后才返回確認。即使部分副本未同步,也不會確認寫入。

min.insync.replicas:確保消息寫入時,至少有 min.insync.replicas 個副本同步,以避免因為單個副本失效導致的數據丟失。

3.4 Kafka Broker 高可用配置

在 Kafka 集群中,為了防止單點故障,Kafka 提供了高可用性配置。配置多個 Kafka Broker 和分布式部署是保證 Kafka 高可用性的基礎。

關鍵配置:

# Kafka Broker 啟動時的監(jiān)聽地址和端口
listeners=PLAINTEXT://localhost:9092
 
# 監(jiān)聽的內網地址,通常與外網地址分開配置
advertised.listeners=PLAINTEXT://broker1.example.com:9092
 
# Zookeeper 配置
zookeeper.connect=localhost:2181  # 設置 Zookeeper 地址,確保 Kafka 集群管理的協(xié)調和一致性

listeners:配置 Kafka Broker 啟動時監(jiān)聽的地址和端口。

advertised.listeners:配置 Kafka Broker 向外暴露的地址和端口,消費者和生產者會連接到這個地址。

zookeeper.connect:設置 Zookeeper 地址,Kafka 依賴 Zookeeper 來管理集群的元數據和協(xié)調。

3.5 日志段與磁盤空間管理

Kafka 使用日志段(Log Segment)來管理消息存儲,每個分區(qū)的消息都會分布在多個日志段中,磁盤空間需要合理管理以避免磁盤溢出。

關鍵配置:

# 設置日志段的大?。▎挝唬鹤止?jié))
log.segment.bytes=1073741824  # 每個日志段的大小為 1 GB
 
# 每個日志段的最大時間
log.roll.ms=604800000  # 設置為 7 天

log.segment.bytes:每個日志段的大小,達到該大小時會創(chuàng)建新的日志段。設置過大可能影響磁盤操作的效率,設置過小可能增加管理開銷。

log.roll.ms:設置日志段的滾動周期,控制日志文件分割的時間粒度。

3.6 性能優(yōu)化與磁盤 I/O 配置

Kafka 作為一個高吞吐量的分布式消息系統(tǒng),對于磁盤 I/O 和網絡 I/O 的要求很高。合理配置 Kafka Broker 的磁盤 I/O 能提高消息的寫入和讀取速度,避免系統(tǒng)性能瓶頸。

關鍵配置:

# 設置消息的最小寫入延遲
log.min.cleanable.dirty.ratio=0.5  # 控制消息清理的閾值
 
# 設置最大允許的線程數
num.io.threads=8     # I/O 線程數量
 
# 設置 Kafka 內存緩存大?。▎挝唬篗B)
log.buffer.size=10485760  # 默認為 10MB

log.min.cleanable.dirty.ratio:設置日志清理的閾值,避免過多無效日志占用空間。

num.io.threads:配置 I/O 線程數。根據機器性能,適當增加線程數以提高并發(fā)處理能力。

log.buffer.size:Kafka 寫入消息時會使用緩沖區(qū),適當增加緩沖區(qū)大小可以提高消息的寫入性能。

3.7 Kafka 集群的監(jiān)控與故障診斷

為了確保 Kafka Broker 的高可用性和穩(wěn)定性,需要對 Kafka 集群進行實時監(jiān)控。Kafka 提供了豐富的監(jiān)控指標,可以通過 JMX 進行監(jiān)控,也可以結合其他工具(如 Prometheus 和 Grafana)來實現集群健康檢查和報警。

JMX 監(jiān)控:Kafka 提供了大量的 JMX 指標,可以用于監(jiān)控消息的吞吐量、延遲、磁盤使用情況等。

日志監(jiān)控:Kafka 的日志記錄了大量的運行時信息,定期檢查日志有助于提前發(fā)現潛在的問題。

4. 總結

通過合理配置和設計 Kafka 消息生產和消費的架構,可以有效地確保消息不丟失,并提高系統(tǒng)的可靠性和容錯性。以下是關鍵的策略總結:

生產者配置:

  • 使用 acks=all 確保消息寫入到所有副本。
  • 啟用消息冪等性和重試機制。
  • 使用合理的 retries 和 delivery.timeout.ms 設置。

消費者配置:

  • 使用手動提交消費位移,避免消息丟失。
  • 使用去重策略和冪等性消費,確保消息不會重復消費。

Broker 配置:

  • 配置合理的副本數和分區(qū)數,確保高可用性。
  • 使用 Zookeeper 確保 Kafka 集群的協(xié)調和管理。
  • 設置合理的日志清理策略,避免存儲空間耗盡。

監(jiān)控與報警:

  • 使用 JMX 監(jiān)控 Kafka 的運行狀態(tài)。
  • 結合 Spring Boot Actuator 進行健康檢查和性能監(jiān)控。

到此這篇關于一文詳解SpringBoot使用Kafka如何保證消息不丟失的文章就介紹到這了,更多相關SpringBoot Kafka保證消息不丟失內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 一文帶你深入了解Java的數據結構

    一文帶你深入了解Java的數據結構

    Java工具包提供了強大的數據結構。這篇文章主要為大家詳細介紹了Java數據結構中常用的幾種接口和類,感興趣的小伙伴可以跟隨小編一起了解一下
    2023-05-05
  • java 串口通信詳細及簡單實例

    java 串口通信詳細及簡單實例

    這篇文章主要介紹了java 串口通信詳細及簡單實例的相關資料,在開發(fā)硬件與軟件結合的時候,就會用到串口,需要的朋友可以參考下
    2017-01-01
  • springboot+jsonp解決前端跨域問題小結

    springboot+jsonp解決前端跨域問題小結

    這篇文章主要介紹了springboot+jsonp解決前端跨域問題小結,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-06-06
  • 使用jmeter實現對jar包的調用方式

    使用jmeter實現對jar包的調用方式

    這篇文章主要介紹了使用jmeter實現對jar包的調用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-03-03
  • web.xml中servlet, bean, filter, listenr 加載順序_動力節(jié)點Java學院整理

    web.xml中servlet, bean, filter, listenr 加載順序_動力節(jié)點Java學院整理

    這篇文章主要介紹了web.xml中servlet, bean, filter, listenr 加載順序,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • MyBatis綁定錯誤提示BindingException:Invalid bound statement (not found)的解決方法

    MyBatis綁定錯誤提示BindingException:Invalid bound statement (not f

    這篇文章主要介紹了MyBatis綁定錯誤提示BindingException:Invalid bound statement (not found)的解決辦法,非常不錯,具有參考借鑒價值,需要的的朋友參考下吧
    2017-01-01
  • Java實現文件或文件夾的復制到指定目錄實例

    Java實現文件或文件夾的復制到指定目錄實例

    本篇文章主要介紹了Java實現文件或文件夾的復制到指定目錄實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-03-03
  • idea在plugins中搜不到插件的解決方法

    idea在plugins中搜不到插件的解決方法

    本文主要介紹了idea在plugins中搜不到插件的解決方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-06-06
  • SpringBoot實現RAS+AES自動接口解密

    SpringBoot實現RAS+AES自動接口解密

    本文主要介紹了SpringBoot實現RAS+AES自動接口解密,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-03-03
  • IDEA?2021.3?使用及idea2021.3.1激活使用方法

    IDEA?2021.3?使用及idea2021.3.1激活使用方法

    IDEA?全稱?IntelliJ?IDEA,是java語言開發(fā)的集成環(huán)境,IntelliJ在業(yè)界被公認為最好的java開發(fā)工具之一,今天通過本文給大家介紹idea2021.3.1激活及使用教程,感興趣的朋友一起看看吧
    2022-01-01

最新評論