Java中RocketMQ使用方法詳解
在 Spring Boot 中,RocketMQ 和 Kafka 都是常用的消息中間件,它們的使用方法有一些相似之處,也有各自的特點(diǎn)。
一、RocketMQ 在 Spring Boot 中的使用
引入依賴
- 在項(xiàng)目的
pom.xml
文件中添加 RocketMQ 的依賴。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>
- 在項(xiàng)目的
配置 RocketMQ
- 在
application.properties
或application.yml
文件中配置 RocketMQ 的相關(guān)參數(shù),如 namesrvAddr(NameServer 地址)等。
rocketmq.name-server=127.0.0.1:9876
- 在
生產(chǎn)者
- 創(chuàng)建一個(gè)生產(chǎn)者類(lèi),使用
@Resource
注入RocketMQTemplate
。
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendMessage(String topic, String message) { rocketMQTemplate.convertAndSend(topic, message); } }
- 創(chuàng)建一個(gè)生產(chǎn)者類(lèi),使用
消費(fèi)者
- 創(chuàng)建一個(gè)消費(fèi)者類(lèi),使用
@RocketMQMessageListener
注解指定監(jiān)聽(tīng)的主題和消費(fèi)組。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group") public class RocketMQConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { // 處理接收到的消息 System.out.println("Received message: " + message); } }
- 創(chuàng)建一個(gè)消費(fèi)者類(lèi),使用
二、Kafka 在 Spring Boot 中的使用
引入依賴
- 在
pom.xml
文件中添加 Kafka 的依賴。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.12</version> </dependency>
- 在
配置 Kafka
- 在
application.properties
或application.yml
文件中配置 Kafka 的相關(guān)參數(shù),如 bootstrapServers(Kafka 服務(wù)器地址)等。
spring.kafka.bootstrap-servers=127.0.0.1:9092
- 在
生產(chǎn)者
- 創(chuàng)建一個(gè)生產(chǎn)者類(lèi),使用
@Resource
注入KafkaTemplate
。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }
- 創(chuàng)建一個(gè)生產(chǎn)者類(lèi),使用
消費(fèi)者
- 創(chuàng)建一個(gè)消費(fèi)者類(lèi),使用
@KafkaListener
注解指定監(jiān)聽(tīng)的主題和消費(fèi)組。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { @KafkaListener(topics = "your_topic", groupId = "your_consumer_group") public void onMessage(String message) { // 處理接收到的消息 System.out.println("Received message: " + message); } }
- 創(chuàng)建一個(gè)消費(fèi)者類(lèi),使用
總的來(lái)說(shuō),RocketMQ 和 Kafka 在 Spring Boot 中的使用都比較方便,具體選擇哪種消息中間件可以根據(jù)項(xiàng)目的實(shí)際需求來(lái)決定。RocketMQ 在一些場(chǎng)景下可能具有高吞吐量、低延遲等優(yōu)勢(shì),而 Kafka 則在大規(guī)模分布式系統(tǒng)中被廣泛應(yīng)用,具有高可靠性和可擴(kuò)展性。
三、如何保證消息隊(duì)列順序性
1、發(fā)送端保證順序性
合理設(shè)計(jì)業(yè)務(wù)
- 確保具有順序性要求的消息被發(fā)送到同一個(gè)主題(Topic)的同一個(gè)隊(duì)列(Queue)中。比如,將同一類(lèi)業(yè)務(wù)的消息按照特定規(guī)則進(jìn)行分類(lèi),使得它們都進(jìn)入相同的隊(duì)列。
- 一個(gè)業(yè)務(wù)場(chǎng)景的消息盡量由一個(gè)發(fā)送端來(lái)發(fā)送消息,避免多個(gè)發(fā)送端發(fā)送可能導(dǎo)致的亂序。
使用同步發(fā)送
- 在發(fā)送消息時(shí),使用同步發(fā)送方式
send(Message msg, long timeout)
,確保消息成功發(fā)送后再進(jìn)行下一個(gè)消息的發(fā)送。這樣可以避免異步發(fā)送可能導(dǎo)致的消息亂序情況。
- 在發(fā)送消息時(shí),使用同步發(fā)送方式
2、消費(fèi)端保證順序性
單線程消費(fèi)
- 消費(fèi)者在消費(fèi)消息時(shí),采用單線程的方式進(jìn)行消費(fèi)。這樣可以確保同一隊(duì)列中的消息按照發(fā)送的順序被依次處理。
@Component @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group") public class RocketMQConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { // 處理接收到的消息 System.out.println("Received message: " + message); } }
在實(shí)際應(yīng)用中,可以將消費(fèi)邏輯放在一個(gè)單獨(dú)的方法中,然后在這個(gè)方法中進(jìn)行順序處理,確保消息的順序性。
避免并發(fā)處理
- 確保在消費(fèi)消息的過(guò)程中,不會(huì)出現(xiàn)并發(fā)處理的情況。比如,不要在消費(fèi)消息的同時(shí)啟動(dòng)其他異步任務(wù)或者多線程處理,以免破壞消息的順序性。
3、設(shè)置隊(duì)列數(shù)量
- 控制隊(duì)列數(shù)量
- 如果業(yè)務(wù)對(duì)消息順序性要求非常嚴(yán)格,可以考慮減少主題下的隊(duì)列數(shù)量。通常情況下,一個(gè)主題可以包含多個(gè)隊(duì)列,消息會(huì)被隨機(jī)分發(fā)到不同的隊(duì)列中。如果隊(duì)列數(shù)量較少,那么消息更有可能被發(fā)送到同一個(gè)隊(duì)列中,從而更容易保證順序性。
通過(guò)以上方法,可以在一定程度上保證 RocketMQ 消息的順序性。但需要注意的是,保證消息順序性可能會(huì)犧牲一定的性能和吞吐量,因此需要根據(jù)實(shí)際業(yè)務(wù)需求進(jìn)行權(quán)衡和選擇。
四、如何確保消息隊(duì)列的可靠性
1、發(fā)送端
同步發(fā)送與確認(rèn)
- 使用同步發(fā)送方式
send(Message msg, long timeout)
,該方法會(huì)等待消息發(fā)送成功的確認(rèn),確保消息被正確地發(fā)送到 Broker。如果發(fā)送失敗或超時(shí),可以進(jìn)行重試或其他錯(cuò)誤處理操作。
try { SendResult sendResult = rocketMQTemplate.syncSend(topic, message); System.out.println("Message sent successfully: " + sendResult); } catch (Exception e) { System.out.println("Failed to send message: " + e.getMessage()); // 進(jìn)行重試或其他錯(cuò)誤處理 }
- 使用同步發(fā)送方式
事務(wù)消息
- 對(duì)于一些需要保證事務(wù)一致性的場(chǎng)景,可以使用 RocketMQ 的事務(wù)消息機(jī)制。發(fā)送事務(wù)消息分為兩個(gè)階段,首先發(fā)送半事務(wù)消息,然后執(zhí)行本地事務(wù),根據(jù)本地事務(wù)的結(jié)果決定提交或回滾事務(wù)消息。
@Service public class TransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendTransactionMessage() { TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transactionTopic", new Message<>("transactionMessage"), null); System.out.println("Transaction message sent: " + result); } }
2、Broker 端
持久化存儲(chǔ)
- RocketMQ 支持消息的持久化存儲(chǔ),可以將消息存儲(chǔ)在磁盤(pán)上,以防止消息丟失。通過(guò)配置
broker.conf
文件中的flushDiskType
參數(shù),可以選擇同步刷盤(pán)或異步刷盤(pán)方式。同步刷盤(pán)可以保證消息在寫(xiě)入磁盤(pán)后才返回成功響應(yīng),但會(huì)影響性能;異步刷盤(pán)可以提高性能,但在系統(tǒng)故障時(shí)可能會(huì)丟失部分未刷盤(pán)的消息。
- RocketMQ 支持消息的持久化存儲(chǔ),可以將消息存儲(chǔ)在磁盤(pán)上,以防止消息丟失。通過(guò)配置
高可用部署
- 部署多主多從的 RocketMQ 集群,當(dāng)主節(jié)點(diǎn)出現(xiàn)故障時(shí),從節(jié)點(diǎn)可以自動(dòng)切換為主節(jié)點(diǎn),保證消息服務(wù)的可用性。同時(shí),可以配置主從同步方式,確保消息在主從節(jié)點(diǎn)之間的可靠同步。
3、消費(fèi)端
消費(fèi)確認(rèn)
- 消費(fèi)者在成功處理消息后,需要向 Broker 發(fā)送消費(fèi)確認(rèn)??梢酝ㄟ^(guò)設(shè)置
consumeMode
為CONSUME_PASSIVELY
(被動(dòng)消費(fèi)模式),并在處理完消息后手動(dòng)調(diào)用acknowledge()
方法進(jìn)行確認(rèn)。如果消費(fèi)失敗,可以選擇重試或者將消息發(fā)送到死信隊(duì)列進(jìn)行后續(xù)處理。
@Component @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group") public class RocketMQConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { try { // 處理消息 System.out.println("Received message: " + message); // 確認(rèn)消費(fèi)成功 getRocketMQListenerContainer().acknowledge(); } catch (Exception e) { System.out.println("Failed to process message: " + e.getMessage()); // 可以選擇重試或者發(fā)送到死信隊(duì)列 } } }
- 消費(fèi)者在成功處理消息后,需要向 Broker 發(fā)送消費(fèi)確認(rèn)??梢酝ㄟ^(guò)設(shè)置
重試機(jī)制
- 配置消費(fèi)者的重試次數(shù)和重試時(shí)間間隔,當(dāng)消費(fèi)失敗時(shí),RocketMQ 會(huì)自動(dòng)進(jìn)行重試。可以在
application.properties
或application.yml
中配置rocketmq.retry.times
和rocketmq.retry.interval
參數(shù)來(lái)控制重試策略。
- 配置消費(fèi)者的重試次數(shù)和重試時(shí)間間隔,當(dāng)消費(fèi)失敗時(shí),RocketMQ 會(huì)自動(dòng)進(jìn)行重試。可以在
通過(guò)以上措施,可以在不同階段保證 RocketMQ 消息的可靠性,確保消息在生產(chǎn)、存儲(chǔ)和消費(fèi)過(guò)程中不會(huì)丟失或出現(xiàn)錯(cuò)誤。
五、保證消息處理的冪等性
在 RocketMQ 中,可以通過(guò)以下幾種方式來(lái)保證消息處理的冪等性:
1、業(yè)務(wù)層面設(shè)計(jì)
使用唯一標(biāo)識(shí)
- 在業(yè)務(wù)中為每條消息生成一個(gè)唯一的標(biāo)識(shí),比如使用業(yè)務(wù)流水號(hào)、訂單號(hào)等作為消息的唯一標(biāo)識(shí)。在消費(fèi)消息時(shí),先根據(jù)這個(gè)唯一標(biāo)識(shí)判斷該消息是否已經(jīng)被處理過(guò)。如果已經(jīng)處理過(guò),則直接忽略該消息。
- 例如在電商系統(tǒng)中,訂單創(chuàng)建的消息可以使用訂單號(hào)作為唯一標(biāo)識(shí)。消費(fèi)者在處理消息時(shí),先查詢數(shù)據(jù)庫(kù)中是否存在該訂單號(hào)對(duì)應(yīng)的處理記錄,如果存在則說(shuō)明該消息已經(jīng)被處理過(guò),不再重復(fù)處理。
@Service public class OrderProcessingService { @Autowired private JdbcTemplate jdbcTemplate; public void processOrderMessage(String orderId) { boolean isProcessed = isOrderProcessed(orderId); if (isProcessed) { return; } // 處理訂單邏輯 System.out.println("Processing order: " + orderId); markOrderAsProcessed(orderId); } private boolean isOrderProcessed(String orderId) { int count = jdbcTemplate.queryForObject( "SELECT COUNT(*) FROM processed_orders WHERE order_id =?", Integer.class, orderId); return count > 0; } private void markOrderAsProcessed(String orderId) { jdbcTemplate.update( "INSERT INTO processed_orders (order_id) VALUES (?)", orderId); } }
利用數(shù)據(jù)庫(kù)約束
- 可以在數(shù)據(jù)庫(kù)中使用唯一索引、主鍵約束等方式來(lái)保證業(yè)務(wù)數(shù)據(jù)的唯一性。在處理消息時(shí),如果違反了這些約束,則說(shuō)明該消息已經(jīng)被處理過(guò),不再重復(fù)處理。
- 比如在用戶注冊(cè)的場(chǎng)景中,可以在數(shù)據(jù)庫(kù)的用戶表中使用用戶名或郵箱作為唯一索引。當(dāng)消費(fèi)用戶注冊(cè)的消息時(shí),嘗試插入用戶數(shù)據(jù),如果插入失敗(因?yàn)檫`反唯一索引約束),則說(shuō)明該用戶已經(jīng)注冊(cè)過(guò),不再重復(fù)處理。
@Service public class UserRegistrationService { @Autowired private JdbcTemplate jdbcTemplate; public void registerUser(String username, String password) { try { jdbcTemplate.update( "INSERT INTO users (username, password) VALUES (?,?)", username, password); } catch (DataIntegrityViolationException e) { // 處理插入失敗的情況,可能是用戶已存在 System.out.println("User already exists: " + username); } } }
2、技術(shù)層面實(shí)現(xiàn)
- 分布式鎖
- 可以使用分布式鎖來(lái)保證同一時(shí)間只有一個(gè)消費(fèi)者實(shí)例在處理特定的消息。在處理消息之前,先獲取分布式鎖,如果獲取成功則處理消息,處理完成后釋放鎖。如果獲取鎖失敗,則說(shuō)明該消息正在被其他實(shí)例處理,當(dāng)前實(shí)例可以選擇等待或者直接忽略該消息。
- 可以使用 Redis 或 Zookeeper 等實(shí)現(xiàn)分布式鎖。以 Redis 為例,可以使用 SETNX 命令來(lái)實(shí)現(xiàn)分布式鎖。
@Service public class MessageProcessingService { @Autowired private StringRedisTemplate redisTemplate; public void processMessage(String messageId) { String lockKey = "message_lock_" + messageId; boolean locked = tryLock(lockKey); if (!locked) { return; } try { boolean isProcessed = isMessageProcessed(messageId); if (isProcessed) { return; } // 處理消息邏輯 System.out.println("Processing message: " + messageId); markMessageAsProcessed(messageId); } finally { releaseLock(lockKey); } } private boolean tryLock(String key) { return redisTemplate.opsForValue().setIfAbsent(key, "locked", Duration.ofSeconds(30)); } private void releaseLock(String key) { redisTemplate.delete(key); } private boolean isMessageProcessed(String messageId) { // 判斷消息是否已處理的邏輯 return false; } private void markMessageAsProcessed(String messageId) { // 標(biāo)記消息已處理的邏輯 } }
通過(guò)以上方法,可以有效地保證 RocketMQ 消息處理的冪等性,避免因重復(fù)消費(fèi)消息而導(dǎo)致的業(yè)務(wù)數(shù)據(jù)不一致問(wèn)題。
總結(jié)
到此這篇關(guān)于Java中RocketMQ使用方法的文章就介紹到這了,更多相關(guān)Java RocketMQ使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決IDEA報(bào)錯(cuò),無(wú)效的源發(fā)行版 無(wú)效的目標(biāo)發(fā)行版:22問(wèn)題
在項(xiàng)目編譯過(guò)程中,可能會(huì)出現(xiàn)“無(wú)效的源發(fā)行版”或“無(wú)效的目標(biāo)發(fā)行版”的報(bào)錯(cuò)信息,原因通常是編譯使用的JDK版本與項(xiàng)目設(shè)置的發(fā)布版本不一致,解決這類(lèi)問(wèn)題的辦法是統(tǒng)一JDK版本,具體操作為:在IDE的項(xiàng)目設(shè)置中(如File->ProjectStructure->ProjectSettings)2024-10-10mybatis一對(duì)多方式實(shí)現(xiàn)批量插入
這篇文章主要介紹了mybatis一對(duì)多方式實(shí)現(xiàn)批量插入,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11501 Command "HELO" requires an argument問(wèn)題的解決方法
換一個(gè)windows服務(wù)器,發(fā)現(xiàn)就沒(méi)這樣的問(wèn)題,僅在一臺(tái)Linux服務(wù)器上可以重現(xiàn),直觀感覺(jué)就是這臺(tái)Linux服務(wù)器某些配置有問(wèn)題2013-08-08SpringBoot中的統(tǒng)一異常處理詳細(xì)解析
這篇文章主要介紹了SpringBoot中的統(tǒng)一異常處理詳細(xì)解析,該注解可以把異常處理器應(yīng)用到所有控制器,而不是單個(gè)控制器,借助該注解,我們可以實(shí)現(xiàn):在獨(dú)立的某個(gè)地方,比如單獨(dú)一個(gè)類(lèi),定義一套對(duì)各種異常的處理機(jī)制,需要的朋友可以參考下2024-01-01