RocketMQ的四種常用消息隊(duì)列及代碼演示
消息隊(duì)列
普通消息隊(duì)列
普通消息隊(duì)列是最基本的一種消息隊(duì)列,可以按照先進(jìn)先出(FIFO)的順序存儲(chǔ)消息,并且可以被多個(gè)消費(fèi)者同時(shí)消費(fèi)??梢酝ㄟ^在生產(chǎn)者端指定主題名稱和標(biāo)簽來創(chuàng)建普通消息隊(duì)列。
順序消息隊(duì)列
順序消息隊(duì)列可以保證相同主題和相同消息鍵的消息按照嚴(yán)格的順序被消費(fèi),例如可以用于訂單等需要保證處理順序的場(chǎng)景??梢酝ㄟ^在創(chuàng)建普通消息隊(duì)列時(shí)指定MessageQueueSelector對(duì)象和鍵來創(chuàng)建順序消息隊(duì)列。
延遲消息隊(duì)列
延遲消息隊(duì)列是一種可以在指定時(shí)間后被消費(fèi)的消息隊(duì)列??梢栽谏a(chǎn)者端指定消息發(fā)送的時(shí)間戳和延遲時(shí)間,RocketMQ會(huì)根據(jù)這些信息將消息存儲(chǔ)到延遲消息隊(duì)列中,并在指定的時(shí)間后投遞消息到普通消息隊(duì)列中。
事務(wù)消息隊(duì)列
事務(wù)消息隊(duì)列是一種可以保證消息投遞的事務(wù)性消息隊(duì)列。在生產(chǎn)者端發(fā)送事務(wù)消息時(shí),會(huì)先向RocketMQ發(fā)送一條預(yù)提交消息,然后在本地事務(wù)執(zhí)行成功后再提交或回滾事務(wù)。如果提交事務(wù),則RocketMQ會(huì)將消息投遞到消費(fèi)者,否則將不會(huì)投遞該消息??梢酝ㄟ^在創(chuàng)建事務(wù)消息隊(duì)列時(shí)指定本地事務(wù)執(zhí)行器來創(chuàng)建事務(wù)消息隊(duì)列。
除此之外,RocketMQ還支持多主題(Topic)、多消息生產(chǎn)者(Producer)和多消費(fèi)者組(Consumer Group)的概念,可以為不同的業(yè)務(wù)場(chǎng)景創(chuàng)建不同的消息隊(duì)列。
代碼演示
普通消息隊(duì)列
@Service public class MyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendMessage(String message) { rocketMQTemplate.convertAndSend("myTopic", message); } }
順序消息隊(duì)列
@Service public class MyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendOrderMessage(String message, int orderId) { rocketMQTemplate.setMessageQueueSelector(new OrderMessageQueueSelector()); rocketMQTemplate.convertAndSend("myTopic", message, orderId); } } class OrderMessageQueueSelector implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object orderId) { int index = (int) orderId % mqs.size(); return mqs.get(index); } }
延遲消息隊(duì)列
@Service public class MyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendDelayMessage(String message, long delayTime) { rocketMQTemplate.syncSend("myTopic", MessageBuilder.withPayload(message) .build(), 3000, 2, delayTime); } }
事務(wù)消息隊(duì)列
@Service public class MyTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { // 執(zhí)行本地事務(wù) // 如果本地事務(wù)執(zhí)行成功,則返回RocketMQLocalTransactionState.COMMIT // 如果本地事務(wù)執(zhí)行失敗,則返回RocketMQLocalTransactionState.ROLLBACK return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { // 檢查本地事務(wù)狀態(tài) // 如果本地事務(wù)執(zhí)行成功,則返回RocketMQLocalTransactionState.COMMIT // 如果本地事務(wù)執(zhí)行失敗,則返回RocketMQLocalTransactionState.ROLLBACK // 如果本地事務(wù)狀態(tài)未知,則返回RocketMQLocalTransactionState.UNKNOWN return RocketMQLocalTransactionState.UNKNOWN; } } @Service public class MyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private MyTransactionListener transactionListener; public void sendTransactionMessage(String message) { rocketMQTemplate.setTransactionListener(transactionListener); rocketMQTemplate.sendMessageInTransaction("myTransactionGroup", "myTopic", MessageBuilder.withpayload(message).build(), null); } }
到此這篇關(guān)于RocketMQ的四種常用消息隊(duì)列及代碼演示的文章就介紹到這了,更多相關(guān)RocketMQ常用消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring boot 不連接數(shù)據(jù)庫(kù)啟動(dòng)的解決
這篇文章主要介紹了spring boot 不連接數(shù)據(jù)庫(kù)啟動(dòng)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08調(diào)用java.lang.Runtime.exec的正確姿勢(shì)分享
這篇文章主要介紹了調(diào)用java.lang.Runtime.exec的正確姿勢(shì),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11java源碼解析之String類的compareTo(String otherString)方法
這篇文章主要給大家介紹了關(guān)于java源碼解析之String類的compareTo(String otherString)方法的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-09-09解決spring-boot-starter-web等報(bào)紅問題
這篇文章主要介紹了解決spring-boot-starter-web等報(bào)紅問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07在IDEA中安裝scala、maven、hadoop遇到的問題小結(jié)
這篇文章主要介紹了在IDEA中安裝scala、maven、hadoop遇到的問題小結(jié),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10