欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java確保MQ消息隊(duì)列不丟失的實(shí)現(xiàn)與流程分析

 更新時(shí)間:2025年05月09日 10:55:30   作者:會(huì)游泳的石頭  
在分布式系統(tǒng)中,消息隊(duì)列是核心組件之一,本文將探討如何確保MQ消息隊(duì)列不丟失,并通過Java代碼示例和流程圖來演示解決方案,需要的可以了解下

前言

在分布式系統(tǒng)中,消息隊(duì)列(Message Queue, MQ)是核心組件之一,用于解耦系統(tǒng)、異步處理和削峰填谷。然而,消息的可靠性傳遞是使用MQ時(shí)需要重點(diǎn)考慮的問題。如果消息在傳輸過程中丟失,可能會(huì)導(dǎo)致數(shù)據(jù)不一致或業(yè)務(wù)邏輯錯(cuò)誤。

本文將探討如何確保MQ消息隊(duì)列不丟失,并通過Java代碼示例和流程圖來演示解決方案。

一、消息丟失的常見場(chǎng)景

生產(chǎn)者端丟失:

  • 消息發(fā)送失敗,未正確寫入MQ。
  • 網(wǎng)絡(luò)異常導(dǎo)致消息未到達(dá)MQ。

MQ服務(wù)端丟失:

  • MQ存儲(chǔ)機(jī)制問題,如磁盤損壞、數(shù)據(jù)被覆蓋等。
  • 配置不當(dāng)導(dǎo)致消息未持久化。

消費(fèi)者端丟失:

  • 消費(fèi)者收到消息后未正確處理。
  • 消費(fèi)者崩潰導(dǎo)致消息未確認(rèn)。

二、解決方案

為了確保消息不丟失,可以從以下幾個(gè)方面入手:

1. 生產(chǎn)者端保障

  • 確認(rèn)機(jī)制:使用生產(chǎn)者確認(rèn)模式(Producer Acknowledgment),確保消息成功寫入MQ。
  • 重試機(jī)制:在網(wǎng)絡(luò)異常時(shí),重試發(fā)送消息。

2. MQ服務(wù)端保障

  • 持久化消息:將消息存儲(chǔ)到磁盤,確保MQ重啟后消息不會(huì)丟失。
  • 高可用架構(gòu):使用主從復(fù)制或集群部署,避免單點(diǎn)故障。

3. 消費(fèi)者端保障

  • 手動(dòng)確認(rèn)模式:消費(fèi)者處理完消息后手動(dòng)確認(rèn),避免重復(fù)消費(fèi)或丟失。
  • 冪等性設(shè)計(jì):確保同一條消息多次消費(fèi)不會(huì)產(chǎn)生副作用。

三、Java代碼實(shí)現(xiàn)

以下代碼展示了如何使用RabbitMQ實(shí)現(xiàn)消息不丟失的完整流程。

1. 生產(chǎn)者端代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 聲明隊(duì)列,設(shè)置持久化
            boolean durable = true; // 持久化隊(duì)列
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

            String message = "Hello, RabbitMQ!";
            // 發(fā)送消息,設(shè)置持久化
            channel.basicPublish("", QUEUE_NAME, 
                MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

2. 消費(fèi)者端代碼

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明隊(duì)列,確保與生產(chǎn)者一致
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        // 設(shè)置手動(dòng)確認(rèn)模式
        channel.basicQos(1); // 每次只接收一條消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            try {
                // 模擬消息處理
                System.out.println(" [x] Received '" + message + "'");
                doWork(message);
            } finally {
                // 手動(dòng)確認(rèn)消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println(" [x] Done");
            }
        };

        // 開始消費(fèi)
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }

    private static void doWork(String task) {
        try {
            Thread.sleep(1000); // 模擬任務(wù)處理時(shí)間
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

四、流程圖分析

五、總結(jié)

通過上述方案,我們可以有效避免消息在生產(chǎn)者、MQ服務(wù)端和消費(fèi)者端的丟失問題。關(guān)鍵在于:

  • 生產(chǎn)者確認(rèn)機(jī)制:確保消息成功寫入MQ。
  • MQ持久化配置:保證消息不會(huì)因服務(wù)重啟而丟失。
  • 消費(fèi)者手動(dòng)確認(rèn):確保消息被正確處理后再確認(rèn)。

到此這篇關(guān)于Java確保MQ消息隊(duì)列不丟失的實(shí)現(xiàn)與流程分析的文章就介紹到這了,更多相關(guān)Java MQ消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring中@Conditional注解的用法

    Spring中@Conditional注解的用法

    這篇文章主要介紹了Spring中@Conditional注解的用法,@Conditional是Spring4新提供的注解,它的作用是按照一定的條件進(jìn)行判斷,滿足條件給容器注冊(cè)bean,需要的朋友可以參考下
    2024-01-01
  • spring boot自定義log4j2日志文件的實(shí)例講解

    spring boot自定義log4j2日志文件的實(shí)例講解

    下面小編就為大家分享一篇spring boot自定義log4j2日志文件的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2017-11-11
  • Springboot項(xiàng)目Mybatis升級(jí)為Mybatis-Plus的詳細(xì)步驟

    Springboot項(xiàng)目Mybatis升級(jí)為Mybatis-Plus的詳細(xì)步驟

    在許多 Java 項(xiàng)目中,MyBatis 是一個(gè)廣泛使用的 ORM 框架,然而,隨著 MyBatis-Plus 的出現(xiàn),許多開發(fā)者開始遷移到這個(gè)更加簡(jiǎn)潔、高效的工具,它在 MyBatis 的基礎(chǔ)上提供了更多的功能,所以本文將介紹Springboot項(xiàng)目Mybatis升級(jí)為Mybatis-Plus的詳細(xì)步驟
    2025-03-03
  • Spring?Boot實(shí)現(xiàn)web.xml功能示例詳解

    Spring?Boot實(shí)現(xiàn)web.xml功能示例詳解

    這篇文章主要介紹了Spring?Boot實(shí)現(xiàn)web.xml功能,通過本文介紹我們了解到,在Spring Boot應(yīng)用中,我們可以通過注解和編程兩種方式實(shí)現(xiàn)web.xml的功能,包括如何創(chuàng)建及注冊(cè)Servlet、Filter以及Listener等,需要的朋友可以參考下
    2023-09-09
  • Java中==與equals的區(qū)別小結(jié)

    Java中==與equals的區(qū)別小結(jié)

    這篇文章主要介紹了Java中==與equals的區(qū)別小結(jié),本文總結(jié)結(jié)論:== 與 equals()比較的內(nèi)容是不同的,equals()方式是String類中的方法,它用于比較兩個(gè)對(duì)象引用所指的內(nèi)容是否相等,而 == 比較的是兩個(gè)對(duì)象引用的地址是否相等,需要的朋友可以參考下
    2015-06-06
  • java?poi?讀取單元格null或者空字符串方式

    java?poi?讀取單元格null或者空字符串方式

    這篇文章主要介紹了java?poi?讀取單元格null或者空字符串方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • java 代碼中預(yù)防空指針異常的處理辦法

    java 代碼中預(yù)防空指針異常的處理辦法

    個(gè)人在做項(xiàng)目時(shí),對(duì)NullPointerException的幾點(diǎn)總結(jié),請(qǐng)網(wǎng)友拍磚?。?!多多提意見,
    2013-03-03
  • Spring Boot實(shí)現(xiàn)通用的接口參數(shù)校驗(yàn)

    Spring Boot實(shí)現(xiàn)通用的接口參數(shù)校驗(yàn)

    本文介紹基于 Spring Boot 和 JDK8 編寫一個(gè) AOP ,結(jié)合自定義注解實(shí)現(xiàn)通用的接口參數(shù)校驗(yàn)。具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-05-05
  • spring boot metrics監(jiān)控指標(biāo)使用教程

    spring boot metrics監(jiān)控指標(biāo)使用教程

    這篇文章主要為大家介紹了針對(duì)應(yīng)用監(jiān)控指標(biāo)暴露spring boot metrics監(jiān)控指標(biāo)的使用教程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2022-02-02
  • Springboot項(xiàng)目使用Slf4j將日志保存到本地目錄的實(shí)現(xiàn)代碼

    Springboot項(xiàng)目使用Slf4j將日志保存到本地目錄的實(shí)現(xiàn)代碼

    這篇文章主要介紹了Springboot項(xiàng)目使用Slf4j將日志保存到本地目錄的實(shí)現(xiàn)方法,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-05-05

最新評(píng)論