如果事務(wù)執(zhí)行成功并選擇提交事務(wù),則產(chǎn)生注冊成功消息,進(jìn)入下一步,需要的朋友可以參考下" />

欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ消息發(fā)送與消息類別詳解

 更新時間:2023年09月08日 09:46:47   作者:小鐘不想敲代碼  
這篇文章主要介紹了RocketMQ消息發(fā)送與消息類別詳解,事務(wù)消息的生產(chǎn)者執(zhí)行本地事務(wù),并根據(jù)事務(wù)執(zhí)行的結(jié)果選擇是否提交或回滾事務(wù),
如果事務(wù)執(zhí)行成功并選擇提交事務(wù),則產(chǎn)生注冊成功消息,進(jìn)入下一步,需要的朋友可以參考下

一、消息發(fā)送

1.1 單生產(chǎn)者單消費(fèi)者消息發(fā)送(OneToOne)

1、新建maven項(xiàng)目recketmqtest

2、導(dǎo)入RocketMQ客戶端坐標(biāo)

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

3、生產(chǎn)者

package com.liming.base;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
 * @author 黎明
 * @version 1.0
 * 生產(chǎn)者
 * @date 2023/5/21 9:08
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        /**
         1. 誰來發(fā)?
         2. 發(fā)給誰?
         3. 怎么發(fā)?
         4. 發(fā)什么?
         5. 發(fā)的結(jié)果是什么?
         6. 打掃戰(zhàn)場
         **/
        // 1、創(chuàng)建一個發(fā)送消息的對象Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2、設(shè)定發(fā)送的命名服務(wù)器地址
        producer.setNamesrvAddr("localhost:9876");
        // 3.1 啟動發(fā)送的服務(wù)
        producer.start();
        // 4、創(chuàng)建要發(fā)送的消息對象
        Message message = new Message("topic1", "tag1","hello recketmq".getBytes());
        // 3.2 發(fā)送消息
        SendResult sendResult = producer.send(message);
        System.out.println("返回結(jié)果:" + sendResult);
        // 5、關(guān)閉連接
        producer.shutdown();
    }
}

4、消費(fèi)者

package com.liming.base;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
 * @author 黎明
 * @version 1.0
 * 消費(fèi)者
 * @date 2023/5/21 9:17
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.創(chuàng)建一個接收消息的對象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.設(shè)定接收的命名服務(wù)器地址
        consumer.setNamesrvAddr("localhost:9876");
        //3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標(biāo)簽為任意
        consumer.subscribe("topic1", "*");
        //3.開啟監(jiān)聽,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍歷消息
                for (MessageExt msg : list) {
                    System.out.println("收到的消息:" + msg);
                    byte[] body = msg.getBody();
                    System.out.println(new String(body));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.啟動接收消息的服務(wù)
        consumer.start();
        System.out.println("接收消息服務(wù)已經(jīng)開啟!");
        //5 不要關(guān)閉消費(fèi)者!
    }
}

1.2 單生產(chǎn)者多消費(fèi)者消息發(fā)送(OneToMany)

生產(chǎn)者

//1.創(chuàng)建一個發(fā)送消息的對象Producer
 DefaultMQProducer producer = new DefaultMQProducer("group1");
 //2.設(shè)定發(fā)送的命名服務(wù)器地址
 producer.setNamesrvAddr("localhost:9876");
 //3.1啟動發(fā)送的服務(wù)
 producer.start();
 for (int i = 0; i < 10; i++) {
     //4.創(chuàng)建要發(fā)送的消息對象,指定topic,指定內(nèi)容body
     Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes();
     //3.2發(fā)送消息
     SendResult result = producer.send(msg);
     System.out.println("返回結(jié)果:" + result);
 }
 //5.關(guān)閉連接
 producer.shutdown();

消費(fèi)者(負(fù)載均衡模式:默認(rèn)模式)

 //1.創(chuàng)建一個接收消息的對象Consumer
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 //2.設(shè)定接收的命名服務(wù)器地址
 consumer.setNamesrvAddr("localhost:9876");
 //3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標(biāo)簽為任意
 consumer.subscribe("topic1","*");
 //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡)
 consumer.setMessageModel(MessageModel.CLUSTERING);
 //3.開啟監(jiān)聽,用于接收消息
 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         //遍歷消息
         for (MessageExt msg : list) {
             System.out.println("收到消息:"+msg);
             System.out.println("消息是:"+new String(msg.getBody()));
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });
 //4.啟動接收消息的服務(wù)
 consumer.start();
 System.out.println("接受消息服務(wù)已經(jīng)開啟!");
 //5 不要關(guān)閉消費(fèi)者!

消費(fèi)者(廣播模式)

//1.創(chuàng)建一個接收消息的對象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.設(shè)定接收的命名服務(wù)器地址
consumer.setNamesrvAddr("localhost:9876");
//3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標(biāo)簽為任意
consumer.subscribe("topic1","*");
//設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡)
//consumer.setMessageModel(MessageModel.CLUSTERING);
//設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(廣播模式)
consumer.setMessageModel(MessageModel.BROADCASTING);
//3.開啟監(jiān)聽,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        //遍歷消息
        for (MessageExt msg : list) {
            System.out.println("收到消息:"+msg);
            System.out.println("消息是:"+new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
//4.啟動接收消息的服務(wù)
consumer.start();
System.out.println("接受消息服務(wù)已經(jīng)開啟!");
//5 不要關(guān)閉消費(fèi)者!

1.3 多生產(chǎn)者多消費(fèi)者消息發(fā)送(ManyToMany)

多生產(chǎn)者產(chǎn)生的消息可以被同一個消費(fèi)者消費(fèi),也可以被多個消費(fèi)者消費(fèi)

二、消息類別

2.1 同步消息

特征:即時性較強(qiáng),重要的消息,且必須有回執(zhí)的消息,例如短信,通知(轉(zhuǎn)賬成功)

在這里插入圖片描述

代碼實(shí)現(xiàn)(生產(chǎn)者中):

SendResult result = producer.send(msg);

2.2 異步消息

特征:即時性較弱,但需要有回執(zhí)的消息,例如訂單中的某些信息

在這里插入圖片描述

代碼實(shí)現(xiàn)(生產(chǎn)者中):

//1.創(chuàng)建一個發(fā)送消息的對象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.設(shè)定發(fā)送的命名服務(wù)器地址
producer.setNamesrvAddr("localhost:9876");
//3.1啟動發(fā)送的服務(wù)
producer.start();
for (int i = 0; i < 10; i++) {
    //4.創(chuàng)建要發(fā)送的消息對象,指定topic,指定內(nèi)容body
    Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes("UTF-8"));
    //3.2 同步消息
    //SendResult result = producer.send(msg);
    //System.out.println("返回結(jié)果:" + result);
    //異步消息
    producer.send(msg, new SendCallback() {
        //表示成功返回結(jié)果
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
        }
        //表示發(fā)送消息失敗
        @Override
        public void onException(Throwable throwable) {
            System.out.println(throwable);
        }
    });
    System.out.println("消息"+i+"發(fā)完了,做業(yè)務(wù)邏輯去了!");
}
//休眠10秒
TimeUnit.SECONDS.sleep(10);
//5.關(guān)閉連接
producer.shutdown();

2.3 單向消息

特征:不需要有回執(zhí)的消息,例如日志類消息

在這里插入圖片描述

代碼實(shí)現(xiàn)(生產(chǎn)者中):

producer.sendOneway(msg);

2.4 延時消息

消息發(fā)送時并不直接發(fā)送到消息服務(wù)器,而是根據(jù)設(shè)定的等待時間到達(dá),起到延時到達(dá)的緩沖作用

Message msg = new Message("topic3",("延時消息:hello rocketmq "+i).getBytes("UTF-8"));
//設(shè)置延時等級3,這個消息將在10s之后發(fā)送(現(xiàn)在只支持固定的幾個時間,詳看delayTimeLevel)
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回結(jié)果:"+result);

目前支持的消息時間:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

2.5 批量消息

批量發(fā)送消息能顯著提高傳遞小消息的性能.

發(fā)送批量消息:

List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));
msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);
SendResult result = producer.send(msgList);

注意限制:

  • 這些批量消息應(yīng)該有相同的topic
  • 相同的waitStoreMsgOK
  • 不能是延時消息
  • 消息內(nèi)容總長度不超過4M

三、消息過濾

3.1 分類過濾

按照tag過濾信息

生產(chǎn)者:

Message msg = new Message("topic6","tag2",("消息過濾按照tag:hello rocketmq 2").getBytes("UTF-8"));

消費(fèi)者:

//接收消息的時候,除了制定topic,還可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");

3.2 語法過濾(屬性過濾/語法過濾/SQL過濾)

基本語法:

  • 數(shù)值比較,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比較,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 邏輯符號 AND,OR,NOT;

常量支持類型為:

  • 數(shù)值,比如:123,3.1415;
  • 字符,比如:‘abc’,必須用單引號包裹起來;
  • NULL,特殊的常量
  • 布爾值,TRUE 或 FALSE

生產(chǎn)者:

//為消息添加屬性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

消費(fèi)者:

//使用消息選擇器來過濾對應(yīng)的屬性,語法格式為類SQL語法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));

注意:SQL過濾需要依賴服務(wù)器的功能支持,在broker.conf配置文件中添加對應(yīng)的功能項(xiàng),并開啟對應(yīng)功能

enablePropertyFilter=true

重啟broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

或者直接cmd中輸入(D:\software\rocketmq-all-4.8.0-bin-release\bin)

mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue

在這里插入圖片描述

四、事務(wù)消息

1、正常事務(wù)過程(灰色線條)

2、事務(wù)補(bǔ)償過程(藍(lán)色線條)

在這里插入圖片描述

事務(wù)消息狀態(tài)

  • 提交狀態(tài):允許進(jìn)入隊(duì)列,此消息與非事務(wù)消息無區(qū)別
  • 回滾狀態(tài):不允許進(jìn)入隊(duì)列,此消息等同于未發(fā)送過
  • 中間狀態(tài):完成了half消息的發(fā)送,未對MQ進(jìn)行二次狀態(tài)確認(rèn)

注意:事務(wù)消息僅與生產(chǎn)者有關(guān),與消費(fèi)者無關(guān)

代碼實(shí)現(xiàn):

提交狀態(tài)

//事務(wù)消息使用的生產(chǎn)者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
//添加本地事務(wù)對應(yīng)的監(jiān)聽
producer.setTransactionListener(new TransactionListener() {
//正常事務(wù)過程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.COMMIT_MESSAGE;
}
//事務(wù)補(bǔ)償過程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事務(wù)消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回結(jié)果:"+result);
producer.shutdown();

回滾狀態(tài)

producer.setTransactionListener(new TransactionListener() {
//正常事務(wù)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    return LocalTransactionState.ROLLBACK_MESSAGE;
}
//事務(wù)補(bǔ)償
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    return null;
}
});

中間狀態(tài)

public static void main(String[] args) throws Exception {
	  TransactionMQProducer producer=new TransactionMQProducer("group1");
	  producer.setNamesrvAddr("localhost:9876");
	  producer.setTransactionListener(new TransactionListener() {
	      //正常事務(wù)
	      @Override
	      public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
	          return LocalTransactionState.UNKNOW;
	      }
	      //事務(wù)補(bǔ)償 正常執(zhí)行UNKNOW才會觸發(fā)
	      @Override
	      public LocalTransactionState checkLocalTransaction(MessageExt msg) {
	          System.out.println("事務(wù)補(bǔ)償");
	          return LocalTransactionState.COMMIT_MESSAGE;
	      }
	  });
	  producer.start();
	  Message msg = new Message("topic13", "hello rocketmq".getBytes("UTF-8"));
	  SendResult result = producer.sendMessageInTransaction(msg, null);
	  System.out.println("返回結(jié)果:" + result);
	  //事務(wù)補(bǔ)償生產(chǎn)者一定要一直啟動著
	  //producer.shutdown();
}

到此這篇關(guān)于RocketMQ消息發(fā)送與消息類別詳解的文章就介紹到這了,更多相關(guān)RocketMQ消息類別內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java對接微信支付全過程

    Java對接微信支付全過程

    本文主要介紹了Java對接微信支付全過程,涵蓋注冊商戶、生成預(yù)支付訂單、處理回調(diào)和查詢訂單狀態(tài)等,具有一定的參考價值,感興趣的可以了解一下
    2025-03-03
  • 通過實(shí)例解析Spring argNames屬性

    通過實(shí)例解析Spring argNames屬性

    這篇文章主要介紹了通過實(shí)例解析Spring argNames屬性,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-11-11
  • SpringData JPA 如何搭建 xml的配置方式

    SpringData JPA 如何搭建 xml的配置方式

    這篇文章主要介紹了SpringData JPA 如何搭建 xml的配置方式,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2023-12-12
  • mybatis 映射文件中if標(biāo)簽判斷字符串相等的兩種方式

    mybatis 映射文件中if標(biāo)簽判斷字符串相等的兩種方式

    這篇文章主要介紹了mybatis 映射文件中if標(biāo)簽判斷字符串相等的方式,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下
    2018-06-06
  • 2021最新IDEA的各種快捷鍵匯總

    2021最新IDEA的各種快捷鍵匯總

    掌握idea的各種快捷鍵,可以幫助我們開發(fā)程序,今天小編給大家?guī)韼追N比較常用的idea快捷鍵及一些快捷鍵介紹,對idea快捷鍵相關(guān)知識,感興趣的朋友一起看看吧
    2021-05-05
  • Java利用Guava?Retry實(shí)現(xiàn)重處理

    Java利用Guava?Retry實(shí)現(xiàn)重處理

    guava-retrying是谷歌的Guava庫的一個小擴(kuò)展,允許為任意函數(shù)調(diào)用創(chuàng)建可配置的重試策略,比如與正常運(yùn)行時間不穩(wěn)定的遠(yuǎn)程服務(wù)對話的函數(shù)調(diào)用。本文將利用其實(shí)現(xiàn)重處理,感興趣的可以了解一下
    2022-08-08
  • Java?中向?Arraylist?添加對象的示例代碼

    Java?中向?Arraylist?添加對象的示例代碼

    本文介紹了如何在 Java 中向 ArrayList 添加對象,并提供了示例和注意事項(xiàng),通過掌握這些知識,讀者可以在自己的 Java 項(xiàng)目中有效地使用 ArrayList 來存儲和操作對象,需要的朋友可以參考下
    2023-11-11
  • maven父工程relativepath標(biāo)簽使用解讀

    maven父工程relativepath標(biāo)簽使用解讀

    文章主要介紹了在使用Maven構(gòu)建父子工程時如何通過設(shè)置父工程和子工程的pom文件來管理依賴和版本,當(dāng)子工程是Spring Boot項(xiàng)目時,可以通過關(guān)閉`relativePath`標(biāo)簽來繼承Spring Boot的父工程,同時在父工程中使用`dependencyManagement`標(biāo)簽來統(tǒng)一管理Spring Boot的依賴版本
    2024-11-11
  • 教你實(shí)現(xiàn)Java接口防刷

    教你實(shí)現(xiàn)Java接口防刷

    有些人會惡意提交,本文主要介紹了教你實(shí)現(xiàn)Java接口防刷,通過在一定時間內(nèi)限制同一用戶對同一個接口的請求次數(shù),具有一定的參考價值,感興趣的可以了解一下
    2024-05-05
  • mybatis 解決從列名到屬性名的自動映射失敗問題

    mybatis 解決從列名到屬性名的自動映射失敗問題

    這篇文章主要介紹了mybatis 解決從列名到屬性名的自動映射失敗問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06

最新評論