RocketMQ保證消息的有序性的案例分享
在分布式系統(tǒng)中,消息隊列(MQ)的有序性是一個重要的特性,尤其是在需要保證事件順序執(zhí)行的業(yè)務場景下。Apache RocketMQ 是一個常用的開源消息中間件,它提供了強大的有序消息處理能力。這里我們會探討 RocketMQ 是如何保證消息的有序性的,包括其設計原理和相關(guān)的源碼實現(xiàn)。
簡單原理
RocketMQ 有序消息的基本概念
RocketMQ 保證有序性的主要方法是通過順序消息來實現(xiàn)的。在 RocketMQ 中,順序消息分為全局順序和分區(qū)順序兩種:
- 全局順序:指的是消息全局范圍內(nèi)的有序,也就是在所有的消息中,都是按照發(fā)送的順序來消費。
- 分區(qū)順序:指的是在同一個隊列(Queue)中的消息是有序的,而不同隊列間的消息并不保證有序。
RocketMQ 默認使用分區(qū)順序,通過將同一個 topic 下的消息分到同一個隊列(queue)中,來保證隊列內(nèi)的消息有序。
RocketMQ 有序消息的實現(xiàn)機制
消息發(fā)送
在發(fā)送端,RocketMQ 通過確保生產(chǎn)者向同一個隊列(Queue)發(fā)送消息來保證消息的有序性。生產(chǎn)者在發(fā)送消息時可以指定消息的 keys 或者其他屬性,RocketMQ 通過這些屬性計算消息應該發(fā)送到哪個隊列。
源碼示例(偽代碼):
public class Producer {
public void sendMessages(List<Message> messages) {
for (Message msg : messages) {
int queueId = this.calculateQueueId(msg);
msg.setQueueId(queueId);
this.sendMessageToQueue(msg, queueId);
}
}
private int calculateQueueId(Message msg) {
// 使用 hash 算法基于 message key 計算隊列 ID
return Math.abs(msg.getKey().hashCode()) % this.queueSize;
}
}
消息消費
在消費端,RocketMQ 使用單線程消費模式來保證同一個隊列的消息順序性。消費者會固定分配到某個隊列,而且是單線程從該隊列拉取并處理消息,從而保證消息的有序處理。
源碼示例(偽代碼):
public class Consumer {
public void consume() {
while (true) {
Message msg = this.pullMessage();
this.processMessage(msg);
}
}
}
簡單案例
在Spring Boot中使用RocketMQ來保證消息的隊列順序性,我們需要配置RocketMQ的客戶端和服務器端以支持順序消息。以下是一個基于RocketMQ和Spring Boot實現(xiàn)的消息順序發(fā)送和消費的例子。這個場景假設我們需要在一個電商系統(tǒng)中處理訂單狀態(tài)更新,訂單狀態(tài)更新必須按照順序來處理,以避免狀態(tài)不一致。
步驟1: 添加依賴
首先,確保你的pom.xml中加入了RocketMQ的Spring Boot Starter依賴。
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
步驟2: 配置RocketMQ
在application.yml或application.properties中配置RocketMQ的基礎(chǔ)屬性:
rocketmq:
name-server: 127.0.0.1:9876 # 修改為你的NameServer地址
producer:
group: order_producer_group
send-message-timeout: 3000
consumer:
group: order_consumer_group
consume-thread-min: 1
consume-thread-max: 1
步驟3: 生產(chǎn)者配置
創(chuàng)建一個生產(chǎn)者服務,這個服務將訂單狀態(tài)更新作為順序消息發(fā)送。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderStatusProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderStatusUpdate(String orderStatus, String orderId) {
// 使用訂單ID作為key保證同一個訂單的更新在同一個隊列
SendResult result = rocketMQTemplate.syncSendOrderly("order-topic", orderStatus, orderId);
System.out.println("Message sent, result: " + result.getSendStatus());
}
}
步驟4: 消費者配置
創(chuàng)建一個消費者服務,這個服務將按順序消費訂單狀態(tài)更新。
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order_consumer_group", consumeMode = ConsumeMode.ORDERLY)
public class OrderStatusConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received order update: " + message);
// 處理訂單更新邏輯
processOrderUpdate(message);
}
private void processOrderUpdate(String status) {
// 實現(xiàn)訂單更新處理邏輯
System.out.println("Processing order status update: " + status);
}
}
步驟5: 測試消息順序
你可以通過編寫一個簡單的測試來發(fā)送多個消息,并觀察消費者是否按順序接收它們。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class OrderStatusTestRunner implements CommandLineRunner {
@Autowired
private OrderStatusProducer producer;
@Override
public void run(String... args) throws Exception {
producer.sendOrderStatusUpdate("Order Created", "OrderId123");
producer.sendOrderStatusUpdate("Payment Received", "OrderId123");
producer.sendOrderStatusUpdate("Shipped", "OrderId123");
producer.sendOrderStatusUpdate("Delivered", "OrderId123");
}
}
通過這個設置,RocketMQ 和 Spring Boot 能夠保證同一個訂單的不同狀態(tài)更新是按照發(fā)送順序被處理的。這對于需要順序一致性的業(yè)務邏輯是非常重要的。
以上就是RocketMQ保證消息的有序性的案例分享的詳細內(nèi)容,更多關(guān)于RocketMQ消息有序性的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Eclipse+Java+Swing+Mysql實現(xiàn)電影購票系統(tǒng)(詳細代碼)
這篇文章主要介紹了Eclipse+Java+Swing+Mysql實現(xiàn)電影購票系統(tǒng)并附詳細的代碼詳解,需要的小伙伴可以參考一下2022-01-01
新版idea創(chuàng)建spring boot項目的詳細教程
這篇文章給大家介紹了新版idea創(chuàng)建spring boot項目的詳細教程,本教程對新手小白友好,若根據(jù)教程創(chuàng)建出現(xiàn)問題導致失敗可下載我提供的源碼,在文章最后,本教程較新,文中通過圖文給大家介紹的非常詳細,感興趣的朋友可以參考下2024-01-01
java swing實現(xiàn)電影購票系統(tǒng)
這篇文章主要為大家詳細介紹了java swing實現(xiàn)電影購票系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-01-01
詳解SpringSecurity中的Authentication信息與登錄流程
這篇文章主要介紹了SpringSecurity中的Authentication信息與登錄流程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09
idea配置maven環(huán)境時maven下載速度慢的解決方法
我們在idea配置maven環(huán)境的時候會發(fā)現(xiàn)maven更新慢的現(xiàn)象,解決辦法就是下載國內(nèi)的鏡像包,完美解決下載速度慢的問題,文中有詳細的具體操作方法,并通過圖文介紹的非常詳細,需要的朋友可以參考下2024-02-02
SpringBoot + Mybatis Plus 整合 Redis的
文章詳細介紹了Redis在用戶管理系統(tǒng)中的應用,包括用戶信息緩存、Token存儲、接口限流、重復提交攔截和熱點數(shù)據(jù)預加載等場景,并提供了具體的實現(xiàn)方案和步驟,感興趣的朋友一起看看吧2025-03-03

