欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成RocketMQ的使用示例

 更新時間:2023年11月20日 09:26:57   作者:小月亮與六便士  
RocketMQ是阿里巴巴開源的一款消息中間件,性能優(yōu)秀,功能齊全,被廣泛應用在各種業(yè)務場景,本文就來介紹一下SpringBoot集成RocketMQ的使用示例,感興趣的可以了解一下

一、RocketMQ基本概念

消息模型(Message Model)

RocketMQ主要由Producer、Broker、Consumer三部分組成,其中Producer負責生產(chǎn)消息,Consumer負責消費消息,Broker負責存儲消息。Broker在實際部署過程中對應一臺服務器,每個Broker可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的Broker。MessageQueue用于存儲消息的物理地址,每個Topic中的消息地址存儲于多個MessageQueue中。ConsumerGroup由多個Consumer實例構成。

1、在springBoot項目中添加Maven依賴 

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.4</version>
        </dependency>

2、添加配置:

application.yml 文件中添加如下配置:

rocketmq:
  name-server: 192.168.152.165:9876
  producer:
    group: my-group

SpringBoot 集成 RocketMQ代碼:

生產(chǎn)者: 消息發(fā)送的三種方式

package com.rocketmq.springbootrocketmq;


import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.TimeUnit;


@RunWith(SpringRunner.class)
@SpringBootTest
public class T {


    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //同步消息
    @Test
    public void testRocketMQ() {
        Message msg = MessageBuilder.withPayload("boot發(fā)送同步消息").build();
        rocketMQTemplate.send("helloTopicBoot", msg);
        System.out.println("success send");
    }

    //異步消息
    @Test
    public void sendASYCMsg() throws InterruptedException {
        Message message = MessageBuilder.withPayload("boot發(fā)送異步消息").build();
        rocketMQTemplate.asyncSend("helloTopicBoot", message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("發(fā)送狀態(tài):"+sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息發(fā)送失敗");
            }
        });
        TimeUnit.SECONDS.sleep(5);
    }

    //一次性消息
    @Test
    public void sendOneWayRocketMQ() {
        Message msg = MessageBuilder.withPayload("boot發(fā)送一次性消息").build();
        rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
    }

}

消費者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;

@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
public class HelloTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

消息消費的兩種模式

集群模式:默認模式

廣播模式:

消費者:messageModel = MessageModel.BROADCASTING

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;

@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING)
public class HelloTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

順序消息

生產(chǎn)者:

    //順序消息
    @Test
    public void sendOrderlyMsg(){
        //設置隊列選擇器
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
                String orderIdStr = (String) o;
                long orderId = Long.parseLong(orderIdStr);
                int index = (int)orderId % list.size();
                return list.get(index);
            }
        });

        List<OrderStep> orderSteps = OrderUtil.buildOrders();
        for (OrderStep orderStep : orderSteps) {
            Message msg = MessageBuilder.withPayload(orderStep.toString()).build();
            rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(orderStep.getOrderId()));

        }
    }

消費者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;

@Component
@RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY)
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("當前線程:" + Thread.currentThread() + "隊列ID"+messageExt.getQueueId() + ",消息內容:" + new String(messageExt.getBody(),Charset.defaultCharset()));
    }
}

延遲消息

生產(chǎn)者:

    //延遲消息
    @Test
    public void sendDelayRocketMQ() {
        Message msg = MessageBuilder.withPayload("boot發(fā)送延時消息,發(fā)送時間:"+new Date()).build();
        rocketMQTemplate.syncSend("helloTopicBoot", msg,3000,3);
    }

消費者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.Date;

@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
public class DelayTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

消息Tag條件過濾

生成者

    //Tag消息
    @Test
    public void sendTagFilterRocketMQ() {
        Message msg1 = MessageBuilder.withPayload("消息A").build();
        rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1);
        Message msg2 = MessageBuilder.withPayload("消息B").build();
        rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2);
        Message msg3 = MessageBuilder.withPayload("消息C").build();
        rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3);
    }

消費者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.Date;

@Component
@RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC")
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

SQL92消息過濾

生產(chǎn)者:

    //SQL92消息
    @Test
    public void sendSQL92FilterRocketMQ() {
        Message msg1 = MessageBuilder.withPayload("小紅,年齡22,體重45").setHeader("age","22").setHeader("weight",45).build();
        rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1);
        Message msg2 = MessageBuilder.withPayload("小明,年齡25,體重60").setHeader("age","25").setHeader("weight",60).build();
        rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2);
        Message msg3 = MessageBuilder.withPayload("小藍,年齡40,體重70").setHeader("age","40").setHeader("weight",70).build();
        rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3);
    }

消費者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.Date;

@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age > 23 and weight > 60")
public class SQL92FilterTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:發(fā)送時間"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

 到此這篇關于SpringBoot集成RocketMQ的使用示例的文章就介紹到這了,更多相關SpringBoot集成RocketMQ內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java并發(fā)編程之Fork/Join框架詳解

    Java并發(fā)編程之Fork/Join框架詳解

    這篇文章主要介紹了Java并發(fā)編程之Fork/Join框架詳解,Fork/Join框架是Java7提供的一個用于并行執(zhí)行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架,需要的朋友可以參考下
    2023-12-12
  • springboot 接收List 入?yún)⒌膸追N方法

    springboot 接收List 入?yún)⒌膸追N方法

    本文主要介紹了springboot 接收List 入?yún)⒌膸追N方法,本文主要介紹了7種方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-03-03
  • SpringBoot?對接飛書多維表格事件回調監(jiān)聽流程分析

    SpringBoot?對接飛書多維表格事件回調監(jiān)聽流程分析

    本文介紹了如何通過飛書事件訂閱機制和SpringBoot項目集成,對多維表數(shù)據(jù)的記錄變更進行對接的詳細流程,包括如何創(chuàng)建應用、配置參數(shù)、編寫訂閱代碼、訂閱文檔事件以及在SpringBoot工程中集成的步驟,感興趣的朋友跟隨小編一起看看吧
    2024-12-12
  • gradle使用maven-publish發(fā)布jar包上傳到私有maven配置

    gradle使用maven-publish發(fā)布jar包上傳到私有maven配置

    這篇文章主要介紹了gradle使用maven-publish發(fā)布jar包上傳到私有maven的配置示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-03-03
  • SpringBoot整合easy-es的詳細過程

    SpringBoot整合easy-es的詳細過程

    本文介紹了EasyES,一個基于Elasticsearch的ORM框架,旨在簡化開發(fā)流程并提高效率,EasyES支持SpringBoot框架,并提供了CRUD操作、批量操作和查詢操作等方法,文章還列舉了使用EasyES時可能遇到的技術難題及解決方法,感興趣的朋友一起看看吧
    2025-02-02
  • 使用SpringBoot+OkHttp+fastjson實現(xiàn)Github的OAuth第三方登錄

    使用SpringBoot+OkHttp+fastjson實現(xiàn)Github的OAuth第三方登錄

    這篇文章主要介紹了使用SpringBoot+OkHttp+fastjson實現(xiàn)Github的OAuth第三方登錄,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-02-02
  • RabbitMQ?延遲隊列實現(xiàn)訂單支付結果異步階梯性通知(實例代碼)

    RabbitMQ?延遲隊列實現(xiàn)訂單支付結果異步階梯性通知(實例代碼)

    這篇文章主要介紹了RabbitMQ?延遲隊列實現(xiàn)訂單支付結果異步階梯性通知,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-02-02
  • Java中BigDecimal類與int、Integer使用總結

    Java中BigDecimal類與int、Integer使用總結

    這篇文章主要給大家介紹了關于Java中BigDecimal類與int、Integer使用的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Java具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2019-07-07
  • springboot 整合druid及配置依賴

    springboot 整合druid及配置依賴

    這篇文章主要介紹了springboot 整合druid及jdbc 依賴、數(shù)據(jù)庫依賴(mysql),druid 依賴的實現(xiàn)代碼,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-12-12
  • spring boot(三)之Spring Boot中Redis的使用

    spring boot(三)之Spring Boot中Redis的使用

    這篇文章主要介紹了spring boot(三)之Spring Boot中Redis的使用,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2017-05-05

最新評論