關(guān)于Java整合RocketMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)詳解
環(huán)境搭建
- 創(chuàng)建Maven項(xiàng)目。
- pom.xml文件中引入RocketMQ依賴。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> </dependency> </dependencies>
生產(chǎn)者
普通消息
RocketMQ可用于以三種方式發(fā)送消息:同步、異步和單向傳輸。前兩種消息類型是可靠的,因?yàn)闊o論它們是否成功發(fā)送都有響應(yīng)。
同步發(fā)送
private final static String nameServer = "127.0.0.1:9876"; private final static String producerGroup = "my_group"; private final static String topic = "topic-test"; @Test public void syncSend() { try { // 初始化一個(gè)producer并設(shè)置Producer group name DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設(shè)置NameServer地址 producer.setNamesrvAddr(nameServer); // 啟動producer producer.start(); // 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對tag進(jìn)行過濾 Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果 SendResult sendResult = producer.send(msg, 10000); System.out.printf("%s%n", sendResult); // 一旦producer不再使用,關(guān)閉producer producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } }
異步發(fā)送
@Test public void asyncSend() throws IOException { try { // 初始化一個(gè)producer并設(shè)置Producer group name DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設(shè)置NameServer地址 producer.setNamesrvAddr(nameServer); // 啟動producer producer.start(); // 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對tag進(jìn)行過濾 Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 異步發(fā)送消息, 發(fā)送結(jié)果通過callback返回給客戶端 producer.send(msg, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.printf("OK %s %n", sendResult.getMsgId()); } public void onException(Throwable e) { System.out.printf("Exception %s %n", e); e.printStackTrace(); } },10000); } catch (Exception e) { e.printStackTrace(); } System.in.read(); }
單向傳輸
@Test public void onewaySend() { try { // 初始化一個(gè)producer并設(shè)置Producer group name DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設(shè)置NameServer地址 producer.setNamesrvAddr(nameServer); producer.setSendMsgTimeout(10000); // 啟動producer producer.start(); // 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對tag進(jìn)行過濾 Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 異步發(fā)送消息, 發(fā)送結(jié)果通過callback返回給客戶端 producer.sendOneway(msg); // 一旦producer不再使用,關(guān)閉producer //producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } }
順序消息
RocketMQ 通過生產(chǎn)者和服務(wù)端的協(xié)議保障單個(gè)生產(chǎn)者串行地發(fā)送消息,并按序存儲和持久化。如需保證消息生產(chǎn)的順序性,則必須滿足以下條件:
- 單一生產(chǎn)者: 消息生產(chǎn)的順序性僅支持單一生產(chǎn)者,不同生產(chǎn)者分布在不同的系統(tǒng),即使設(shè)置相同的分區(qū)鍵,不同生產(chǎn)者之間產(chǎn)生的消息也無法判定其先后順序。
- 串行發(fā)送:生產(chǎn)者客戶端支持多線程安全訪問,但如果生產(chǎn)者使用多線程并行發(fā)送,則不同線程間產(chǎn)生的消息將無法判定其先后順序。
@Test public void orderSend() { try { // 初始化一個(gè)producer并設(shè)置Producer group name DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設(shè)置NameServer地址 producer.setNamesrvAddr(nameServer); producer.setSendMsgTimeout(10000); // 啟動producer producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC"}; for (int i = 0; i < 10; i++) { int orderId = i % 10; Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } }
延遲消息
延遲消息發(fā)送是指消息發(fā)送到RocketMQ后,并不期望立馬投遞這條消息,而是延遲一定時(shí)間后才投遞到Consumer進(jìn)行消費(fèi)。 使用 RocketMQ 的延時(shí)消息可以簡化定時(shí)調(diào)度任務(wù)的開發(fā)邏輯,實(shí)現(xiàn)高性能、可擴(kuò)展、高可靠的定時(shí)觸發(fā)能力。 RocketMQ 一共支持18個(gè)等級的延遲投遞,具體時(shí)間如下:
投遞等級 | 延遲時(shí)間 | 投遞等級 | 延遲時(shí)間 |
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
@Test public void scheduledSend() { try { // 初始化一個(gè)producer并設(shè)置Producer group name DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設(shè)置NameServer地址 producer.setNamesrvAddr(nameServer); // 啟動producer producer.start(); // 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對tag進(jìn)行過濾 Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息延遲等級 msg.setDelayTimeLevel(2); // 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果 SendResult sendResult = producer.send(msg, 10000); System.out.printf("%s%n", sendResult); // 一旦producer不再使用,關(guān)閉producer producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } }
批量消息
在對吞吐率有一定要求的情況下,Apache RocketMQ可以將一些消息聚成一批以后進(jìn)行發(fā)送,可以增加吞吐率,并減少API和網(wǎng)絡(luò)調(diào)用次數(shù)。
@Test public void batchSend() { try { // 初始化一個(gè)producer并設(shè)置Producer group name DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設(shè)置NameServer地址 producer.setNamesrvAddr(nameServer); // 啟動producer producer.start(); List<Message> messages = new ArrayList<Message>(); messages.add(new Message(topic, "Tag", "Order001", "Hello world 0".getBytes())); messages.add(new Message(topic, "Tag", "Order002", "Hello world 1".getBytes())); messages.add(new Message(topic, "Tag", "Order003", "Hello world 2".getBytes())); // 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果 SendResult sendResult = producer.send(messages, 10000); System.out.printf("%s%n", sendResult); // 一旦producer不再使用,關(guān)閉producer producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } }
事務(wù)消息
在一些對數(shù)據(jù)一致性有強(qiáng)需求的場景,可以用RocketMQ 事務(wù)消息來解決,從而保證上下游數(shù)據(jù)的一致性。
基于 RocketMQ 的分布式事務(wù)消息功能,在普通消息基礎(chǔ)上,支持二階段的提交能力。將二階段提交和本地事務(wù)綁定,實(shí)現(xiàn)全局提交結(jié)果的一致性。
第一階段會發(fā)送一個(gè)半事務(wù)消息,半事務(wù)消息是指暫不能投遞的消息。 如果發(fā)送成功則執(zhí)行本地事務(wù),并根據(jù)本地事務(wù)執(zhí)行成功與否,向 Broker 半事務(wù)消息狀態(tài)(commit或者rollback)。 半事務(wù)消息只有 commit 狀態(tài)才會真正向下游投遞。 如果由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,Broker 端會通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時(shí),需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback)。這樣最終保證了本地事務(wù)執(zhí)行成功,下游就能收到消息,本地事務(wù)執(zhí)行失敗,下游就收不到消息。
事務(wù)消息的詳細(xì)交互流程如下圖所示:
@Test public void transactionSend() { try { // 事務(wù)消息的發(fā)送不再使用 DefaultMQProducer,而是使用 TransactionMQProducer 進(jìn)行發(fā)送 TransactionMQProducer producer = new TransactionMQProducer(producerGroup); // 設(shè)置NameServer地址 producer.setNamesrvAddr(nameServer); // 事務(wù)回查的線程池 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(new TransactionListener() { //半事務(wù)消息發(fā)送成功后,執(zhí)行本地事務(wù)的方法 public LocalTransactionState executeLocalTransaction(Message msg, Object o) { System.out.printf("執(zhí)行本地事務(wù) %n"); /* 二次確認(rèn) LocalTransactionState.COMMIT_MESSAGE:提交事務(wù),允許消費(fèi)者消費(fèi)該消息 LocalTransactionState.ROLLBACK_MESSAGE:回滾事務(wù),消息將被丟棄不允許消費(fèi)。 LocalTransactionState.UNKNOW:暫時(shí)無法判斷狀態(tài),等待固定時(shí)間以后Broker端根據(jù)回查規(guī)則向生產(chǎn)者進(jìn)行消息回查。 */ return LocalTransactionState.UNKNOW; } // 二次確認(rèn)消息沒有收到,Broker端回查事務(wù)狀態(tài)的方法,默認(rèn)60s public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.printf("二次確認(rèn)失敗,broker事務(wù)回查 %n"); return LocalTransactionState.COMMIT_MESSAGE; } }); producer.setSendMsgTimeout(10000); // 啟動producer producer.start(); Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 利用producer進(jìn)行發(fā)送事務(wù)消息,并同步等待發(fā)送結(jié)果 SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); // 一旦producer不再使用,關(guān)閉producer System.in.read(); } catch (Exception e) { e.printStackTrace(); } }
消費(fèi)者
MQ的消費(fèi)模式可以大致分為兩種,一種是推Push,一種是拉Pull。
Push消費(fèi)
Push是服務(wù)端主動推送消息給客戶端,優(yōu)點(diǎn)是及時(shí)性較好,但如果客戶端沒有做好流控,一旦服務(wù)端推送大量消息到客戶端時(shí),就會導(dǎo)致客戶端消息堆積甚至崩潰。
private final static String nameServer = "127.0.0.1:9876"; private final static String consumerGroup = "my_group"; private final static String topic = "topic-test"; @Test public void consumerPush() throws MQClientException, IOException { // 初始化consumer,并設(shè)置consumer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); // 設(shè)置NameServer地址 consumer.setNamesrvAddr(nameServer); // 訂閱一個(gè)或多個(gè)topic,并指定tag過濾條件,這里指定*表示接收所有tag的消息 consumer.subscribe(topic, "*"); //設(shè)置采用廣播模式,廣播模式下,消費(fèi)組內(nèi)的每一個(gè)消費(fèi)者都會消費(fèi)全量消息。 //consumer.setMessageModel(MessageModel.BROADCASTING); //注冊回調(diào)接口來處理從Broker中收到的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS為消費(fèi)成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 順序消費(fèi) // consumer.registerMessageListener(new MessageListenerOrderly() { // public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // return ConsumeOrderlyStatus.SUCCESS; // } // }); // 啟動Consumer consumer.start(); System.out.printf("Consumer Started.%n"); System.in.read(); }
Pull 消費(fèi)
Pull是客戶端需要主動到服務(wù)端取數(shù)據(jù),優(yōu)點(diǎn)是客戶端可以依據(jù)自己的消費(fèi)能力進(jìn)行消費(fèi),但拉取的頻率也需要用戶自己控制,拉取頻繁容易造成服務(wù)端和客戶端的壓力,拉取間隔長又容易造成消費(fèi)不及時(shí)。
@Test public void consumerPull() { try { DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup); consumer.setNamesrvAddr(nameServer); //關(guān)閉自動提交 consumer.setAutoCommit(false); consumer.subscribe(topic, "*"); consumer.setPullBatchSize(20); consumer.start(); while (true) { List<MessageExt> messageExts = consumer.poll(); System.out.printf("%s%n", messageExts); consumer.commitSync(); } } catch (Exception e) { e.printStackTrace(); } }
代碼倉庫
https://gitee.com/codeWBG/learn_rocketmq
到此這篇關(guān)于關(guān)于Java整合RocketMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)詳解的文章就介紹到這了,更多相關(guān)Java整合RocketMQ生產(chǎn)消費(fèi)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot后端接收參數(shù)優(yōu)化代碼示例(統(tǒng)一處理前端參數(shù))
使用Spring Boot開發(fā)API的時(shí)候,讀取請求參數(shù)是服務(wù)端編碼中最基本的一項(xiàng)操作,下面這篇文章主要給大家介紹了關(guān)于SpringBoot后端接收參數(shù)優(yōu)化(統(tǒng)一處理前端參數(shù))的相關(guān)資料,需要的朋友可以參考下2024-07-07java經(jīng)典問題:連個(gè)字符串互為回環(huán)變位
連個(gè)字符串互為回環(huán)變位經(jīng)常出現(xiàn)在java程序員面試中,這個(gè)是考驗(yàn)程序員的解題思路和方法的最經(jīng)典的一題,小編為大家詳細(xì)分析一下,一起來學(xué)習(xí)吧。2017-11-11Java報(bào)錯(cuò):UnsupportedOperationException in Collection
在Java編程中,UnsupportedOperationException是一種常見的運(yùn)行時(shí)異常,通常在試圖對不支持的操作執(zhí)行修改時(shí)發(fā)生,它表示當(dāng)前操作不被支持,本文將深入探討UnsupportedOperationException的產(chǎn)生原因,并提供具體的解決方案和最佳實(shí)踐,需要的朋友可以參考下2024-06-06mybatis那些約定的配置你真的都了解嗎(經(jīng)驗(yàn)總結(jié))
mybatsi中Mapper和xml文件之間有很多約定俗稱的規(guī)則,比如名稱匹配,包掃描,別名等,這些規(guī)則是什么。如果想更加靈活,該如何配置呢?今天就給大家講一下如何配置mybatsi的xml文件2021-06-06eclipse輸出Hello World的實(shí)現(xiàn)方法
這篇文章主要介紹了eclipse輸出Hello World的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11java 與testng利用XML做數(shù)據(jù)源的數(shù)據(jù)驅(qū)動示例詳解
這篇文章主要介紹了java 與testng利用XML做數(shù)據(jù)源的數(shù)據(jù)驅(qū)動示例詳解的相關(guān)資料,需要的朋友可以參考下2017-01-01