關(guān)于Java整合RocketMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)詳解
環(huán)境搭建
- 創(chuàng)建Maven項(xiàng)目。
- pom.xml文件中引入RocketMQ依賴。
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>生產(chǎn)者
普通消息
RocketMQ可用于以三種方式發(fā)送消息:同步、異步和單向傳輸。前兩種消息類型是可靠的,因?yàn)闊o(wú)論它們是否成功發(fā)送都有響應(yīng)。
同步發(fā)送

private final static String nameServer = "127.0.0.1:9876";
private final static String producerGroup = "my_group";
private final static String topic = "topic-test";
@Test
public void syncSend() {
try {
// 初始化一個(gè)producer并設(shè)置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 設(shè)置NameServer地址
producer.setNamesrvAddr(nameServer);
// 啟動(dòng)producer
producer.start();
// 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對(duì)消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對(duì)tag進(jìn)行過(guò)濾
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用,關(guān)閉producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}異步發(fā)送

@Test
public void asyncSend() throws IOException {
try {
// 初始化一個(gè)producer并設(shè)置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 設(shè)置NameServer地址
producer.setNamesrvAddr(nameServer);
// 啟動(dòng)producer
producer.start();
// 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對(duì)消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對(duì)tag進(jìn)行過(guò)濾
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 異步發(fā)送消息, 發(fā)送結(jié)果通過(guò)callback返回給客戶端
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.printf("OK %s %n",
sendResult.getMsgId());
}
public void onException(Throwable e) {
System.out.printf("Exception %s %n", e);
e.printStackTrace();
}
},10000);
} catch (Exception e) {
e.printStackTrace();
}
System.in.read();
}單向傳輸

@Test
public void onewaySend() {
try {
// 初始化一個(gè)producer并設(shè)置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 設(shè)置NameServer地址
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(10000);
// 啟動(dòng)producer
producer.start();
// 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對(duì)消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對(duì)tag進(jìn)行過(guò)濾
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 異步發(fā)送消息, 發(fā)送結(jié)果通過(guò)callback返回給客戶端
producer.sendOneway(msg);
// 一旦producer不再使用,關(guān)閉producer
//producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}順序消息
RocketMQ 通過(guò)生產(chǎn)者和服務(wù)端的協(xié)議保障單個(gè)生產(chǎn)者串行地發(fā)送消息,并按序存儲(chǔ)和持久化。如需保證消息生產(chǎn)的順序性,則必須滿足以下條件:
- 單一生產(chǎn)者: 消息生產(chǎn)的順序性僅支持單一生產(chǎn)者,不同生產(chǎn)者分布在不同的系統(tǒng),即使設(shè)置相同的分區(qū)鍵,不同生產(chǎn)者之間產(chǎn)生的消息也無(wú)法判定其先后順序。
- 串行發(fā)送:生產(chǎn)者客戶端支持多線程安全訪問(wèn),但如果生產(chǎn)者使用多線程并行發(fā)送,則不同線程間產(chǎn)生的消息將無(wú)法判定其先后順序。
@Test
public void orderSend() {
try {
// 初始化一個(gè)producer并設(shè)置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 設(shè)置NameServer地址
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(10000);
// 啟動(dòng)producer
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
int orderId = i % 10;
Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
}延遲消息
延遲消息發(fā)送是指消息發(fā)送到RocketMQ后,并不期望立馬投遞這條消息,而是延遲一定時(shí)間后才投遞到Consumer進(jìn)行消費(fèi)。 使用 RocketMQ 的延時(shí)消息可以簡(jiǎn)化定時(shí)調(diào)度任務(wù)的開(kāi)發(fā)邏輯,實(shí)現(xiàn)高性能、可擴(kuò)展、高可靠的定時(shí)觸發(fā)能力。 RocketMQ 一共支持18個(gè)等級(jí)的延遲投遞,具體時(shí)間如下:
| 投遞等級(jí) | 延遲時(shí)間 | 投遞等級(jí) | 延遲時(shí)間 |
| 1 | 1s | 10 | 6min |
| 2 | 5s | 11 | 7min |
| 3 | 10s | 12 | 8min |
| 4 | 30s | 13 | 9min |
| 5 | 1min | 14 | 10min |
| 6 | 2min | 15 | 20min |
| 7 | 3min | 16 | 30min |
| 8 | 4min | 17 | 1h |
| 9 | 5min | 18 | 2h |
@Test
public void scheduledSend() {
try {
// 初始化一個(gè)producer并設(shè)置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 設(shè)置NameServer地址
producer.setNamesrvAddr(nameServer);
// 啟動(dòng)producer
producer.start();
// 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對(duì)消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對(duì)tag進(jìn)行過(guò)濾
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 消息延遲等級(jí)
msg.setDelayTimeLevel(2);
// 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用,關(guān)閉producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}批量消息
在對(duì)吞吐率有一定要求的情況下,Apache RocketMQ可以將一些消息聚成一批以后進(jìn)行發(fā)送,可以增加吞吐率,并減少API和網(wǎng)絡(luò)調(diào)用次數(shù)。
@Test
public void batchSend() {
try {
// 初始化一個(gè)producer并設(shè)置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 設(shè)置NameServer地址
producer.setNamesrvAddr(nameServer);
// 啟動(dòng)producer
producer.start();
List<Message> messages = new ArrayList<Message>();
messages.add(new Message(topic, "Tag", "Order001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "Order002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "Order003", "Hello world 2".getBytes()));
// 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果
SendResult sendResult = producer.send(messages, 10000);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用,關(guān)閉producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}事務(wù)消息
在一些對(duì)數(shù)據(jù)一致性有強(qiáng)需求的場(chǎng)景,可以用RocketMQ 事務(wù)消息來(lái)解決,從而保證上下游數(shù)據(jù)的一致性。
基于 RocketMQ 的分布式事務(wù)消息功能,在普通消息基礎(chǔ)上,支持二階段的提交能力。將二階段提交和本地事務(wù)綁定,實(shí)現(xiàn)全局提交結(jié)果的一致性。
第一階段會(huì)發(fā)送一個(gè)半事務(wù)消息,半事務(wù)消息是指暫不能投遞的消息。 如果發(fā)送成功則執(zhí)行本地事務(wù),并根據(jù)本地事務(wù)執(zhí)行成功與否,向 Broker 半事務(wù)消息狀態(tài)(commit或者rollback)。 半事務(wù)消息只有 commit 狀態(tài)才會(huì)真正向下游投遞。 如果由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,Broker 端會(huì)通過(guò)掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于“半事務(wù)消息”時(shí),需要主動(dòng)向消息生產(chǎn)者詢問(wèn)該消息的最終狀態(tài)(Commit或是Rollback)。這樣最終保證了本地事務(wù)執(zhí)行成功,下游就能收到消息,本地事務(wù)執(zhí)行失敗,下游就收不到消息。
事務(wù)消息的詳細(xì)交互流程如下圖所示:

@Test
public void transactionSend() {
try {
// 事務(wù)消息的發(fā)送不再使用 DefaultMQProducer,而是使用 TransactionMQProducer 進(jìn)行發(fā)送
TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
// 設(shè)置NameServer地址
producer.setNamesrvAddr(nameServer);
// 事務(wù)回查的線程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(new TransactionListener() {
//半事務(wù)消息發(fā)送成功后,執(zhí)行本地事務(wù)的方法
public LocalTransactionState executeLocalTransaction(Message msg, Object o) {
System.out.printf("執(zhí)行本地事務(wù) %n");
/*
二次確認(rèn)
LocalTransactionState.COMMIT_MESSAGE:提交事務(wù),允許消費(fèi)者消費(fèi)該消息
LocalTransactionState.ROLLBACK_MESSAGE:回滾事務(wù),消息將被丟棄不允許消費(fèi)。
LocalTransactionState.UNKNOW:暫時(shí)無(wú)法判斷狀態(tài),等待固定時(shí)間以后Broker端根據(jù)回查規(guī)則向生產(chǎn)者進(jìn)行消息回查。
*/
return LocalTransactionState.UNKNOW;
}
// 二次確認(rèn)消息沒(méi)有收到,Broker端回查事務(wù)狀態(tài)的方法,默認(rèn)60s
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.printf("二次確認(rèn)失敗,broker事務(wù)回查 %n");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.setSendMsgTimeout(10000);
// 啟動(dòng)producer
producer.start();
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 利用producer進(jìn)行發(fā)送事務(wù)消息,并同步等待發(fā)送結(jié)果
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用,關(guān)閉producer
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}消費(fèi)者
MQ的消費(fèi)模式可以大致分為兩種,一種是推Push,一種是拉Pull。
Push消費(fèi)
Push是服務(wù)端主動(dòng)推送消息給客戶端,優(yōu)點(diǎn)是及時(shí)性較好,但如果客戶端沒(méi)有做好流控,一旦服務(wù)端推送大量消息到客戶端時(shí),就會(huì)導(dǎo)致客戶端消息堆積甚至崩潰。
private final static String nameServer = "127.0.0.1:9876";
private final static String consumerGroup = "my_group";
private final static String topic = "topic-test";
@Test
public void consumerPush() throws MQClientException, IOException {
// 初始化consumer,并設(shè)置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// 設(shè)置NameServer地址
consumer.setNamesrvAddr(nameServer);
// 訂閱一個(gè)或多個(gè)topic,并指定tag過(guò)濾條件,這里指定*表示接收所有tag的消息
consumer.subscribe(topic, "*");
//設(shè)置采用廣播模式,廣播模式下,消費(fèi)組內(nèi)的每一個(gè)消費(fèi)者都會(huì)消費(fèi)全量消息。
//consumer.setMessageModel(MessageModel.BROADCASTING);
//注冊(cè)回調(diào)接口來(lái)處理從Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS為消費(fèi)成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 順序消費(fèi)
// consumer.registerMessageListener(new MessageListenerOrderly() {
// public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// return ConsumeOrderlyStatus.SUCCESS;
// }
// });
// 啟動(dòng)Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
System.in.read();
}Pull 消費(fèi)
Pull是客戶端需要主動(dòng)到服務(wù)端取數(shù)據(jù),優(yōu)點(diǎn)是客戶端可以依據(jù)自己的消費(fèi)能力進(jìn)行消費(fèi),但拉取的頻率也需要用戶自己控制,拉取頻繁容易造成服務(wù)端和客戶端的壓力,拉取間隔長(zhǎng)又容易造成消費(fèi)不及時(shí)。
@Test
public void consumerPull() {
try {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
//關(guān)閉自動(dòng)提交
consumer.setAutoCommit(false);
consumer.subscribe(topic, "*");
consumer.setPullBatchSize(20);
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
System.out.printf("%s%n", messageExts);
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
}
}代碼倉(cāng)庫(kù)
https://gitee.com/codeWBG/learn_rocketmq
到此這篇關(guān)于關(guān)于Java整合RocketMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)詳解的文章就介紹到這了,更多相關(guān)Java整合RocketMQ生產(chǎn)消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot后端接收參數(shù)優(yōu)化代碼示例(統(tǒng)一處理前端參數(shù))
使用Spring Boot開(kāi)發(fā)API的時(shí)候,讀取請(qǐng)求參數(shù)是服務(wù)端編碼中最基本的一項(xiàng)操作,下面這篇文章主要給大家介紹了關(guān)于SpringBoot后端接收參數(shù)優(yōu)化(統(tǒng)一處理前端參數(shù))的相關(guān)資料,需要的朋友可以參考下2024-07-07
java經(jīng)典問(wèn)題:連個(gè)字符串互為回環(huán)變位
連個(gè)字符串互為回環(huán)變位經(jīng)常出現(xiàn)在java程序員面試中,這個(gè)是考驗(yàn)程序員的解題思路和方法的最經(jīng)典的一題,小編為大家詳細(xì)分析一下,一起來(lái)學(xué)習(xí)吧。2017-11-11
Java報(bào)錯(cuò):UnsupportedOperationException in Collection
在Java編程中,UnsupportedOperationException是一種常見(jiàn)的運(yùn)行時(shí)異常,通常在試圖對(duì)不支持的操作執(zhí)行修改時(shí)發(fā)生,它表示當(dāng)前操作不被支持,本文將深入探討UnsupportedOperationException的產(chǎn)生原因,并提供具體的解決方案和最佳實(shí)踐,需要的朋友可以參考下2024-06-06
mybatis那些約定的配置你真的都了解嗎(經(jīng)驗(yàn)總結(jié))
mybatsi中Mapper和xml文件之間有很多約定俗稱的規(guī)則,比如名稱匹配,包掃描,別名等,這些規(guī)則是什么。如果想更加靈活,該如何配置呢?今天就給大家講一下如何配置mybatsi的xml文件2021-06-06
eclipse輸出Hello World的實(shí)現(xiàn)方法
這篇文章主要介紹了eclipse輸出Hello World的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
java 與testng利用XML做數(shù)據(jù)源的數(shù)據(jù)驅(qū)動(dòng)示例詳解
這篇文章主要介紹了java 與testng利用XML做數(shù)據(jù)源的數(shù)據(jù)驅(qū)動(dòng)示例詳解的相關(guān)資料,需要的朋友可以參考下2017-01-01
SpringBoot中使用Redis作為全局鎖示例過(guò)程
這篇文章主要為大家介紹了SpringBoot中使用Redis作為全局鎖示例過(guò)程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03
Java多態(tài)中動(dòng)態(tài)綁定原理解析
這篇文章主要介紹了Java多態(tài)中動(dòng)態(tài)綁定原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12

