欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot2實(shí)現(xiàn)MessageQueue消息隊(duì)列

 更新時(shí)間:2023年04月25日 09:47:34   作者:yjx23332  
本文主要介紹了 SpringBoot2實(shí)現(xiàn)MessageQueue消息隊(duì)列,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

什么是消息隊(duì)列

通常說的消息隊(duì)列,簡稱MQ(Message Queue),指的就是消息中間件。簡單理解為一個(gè)使用隊(duì)列來通信的組件,本質(zhì)上就是個(gè)轉(zhuǎn)發(fā)器,包含發(fā)消息,存消息,消費(fèi)消息的過程。

一、異步與同步

1.1 同步通訊與異步通訊

  • 同步通訊:時(shí)效性強(qiáng)。比如視頻電話,實(shí)時(shí)傳到對(duì)方,同時(shí)對(duì)方出回應(yīng)。
  • 異步通訊:比如網(wǎng)絡(luò)聊天,非實(shí)時(shí)反饋的,不會(huì)立即得到結(jié)果,可以之后再回復(fù)。

1.2 同步調(diào)用的問題

微服務(wù)基于Feign的調(diào)用就是同步的方式。

以購物場景為例

但是如果要加業(yè)務(wù)就需要為支付服務(wù)加業(yè)務(wù),改動(dòng)其代碼,耦合度高。

同時(shí),同步調(diào)用,要等待服務(wù)結(jié)束后,在進(jìn)行下一個(gè)服務(wù)。支付總耗時(shí),是支付服務(wù)依次調(diào)用服務(wù)耗時(shí)的時(shí)間和,耗時(shí)過長。

此外,如果倉儲(chǔ)服務(wù)掛掉了,支付服務(wù)就會(huì)被卡在那里。當(dāng)過多的支付服務(wù)都卡在那里,于是資源耗盡,支付服務(wù)也掛掉了。

問題:

  • 耦合度高
  • 性能下降
  • 資源浪費(fèi)
  • 級(jí)聯(lián)失敗

1.3 異步調(diào)用方案

異步調(diào)用常見的就是事件驅(qū)動(dòng)模式

當(dāng)支付服務(wù)告知了Broker后,就可以繼續(xù)自己的事情了,而不需要等待。

優(yōu)勢:

  • 代碼解耦合:不需要改動(dòng)支付服務(wù),只需要讓服務(wù)訂閱或者取消訂閱Broker即可。
  • 耗時(shí)減少了:只計(jì)算支付服務(wù)和通知Broker的時(shí)間。
  • 不存在級(jí)聯(lián)失敗的問題,倉儲(chǔ)服務(wù)掛了不再影響支付服務(wù)。
  • 流量消峰:當(dāng)流量過大時(shí),請(qǐng)求排在Broker中,服務(wù)能做幾個(gè)就做幾個(gè),做不了的就排著。

缺點(diǎn):

  • Broker掛了也會(huì)出問題,依賴于Broker的可靠性,安全性,吞吐能力
  • 架構(gòu)復(fù)雜了,業(yè)務(wù)沒有明顯的流程線,不好追蹤管理

二、MQ消息隊(duì)列

在上述的結(jié)構(gòu)中,就是Broker。

常用的MQ有幾種實(shí)現(xiàn)。

RabbitMQActiveMQRocketMQKafaka
公司/社區(qū)RabbitApache阿里Apache
開發(fā)語言ErlangJavaJavaScala&Java
協(xié)議支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定義協(xié)議自定義協(xié)議
可用性一般
單機(jī)吞吐量一般非常高
消息延遲微秒級(jí)毫秒級(jí)毫秒級(jí)毫秒以內(nèi)
消息可靠性一般一般
  • 一般中小型公司,用的就是RabbitMQ。
  • 如果大型企業(yè),做深度定制,可以用RocketMQ
  • Kafaka則是用于大量數(shù)據(jù)情況下的處理,但安全可靠性相對(duì)較差。
  • ActiveMQ是很早的消息隊(duì)列,如今幾乎沒有維護(hù)。

2.1 單機(jī)部署MQ

通過docker部署最簡單,

docker pull rabbitmq:3-management

也可以用命令安裝,這里直接用容器了。

啟動(dòng)信息如下

docker run -e RABBITMQ_DEFAULT_USER=yjx23332 -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

-e 為設(shè)置環(huán)境變量
兩個(gè)端口,15672是管理平臺(tái)端口,5672是發(fā)送消息的端口。

記得開放對(duì)應(yīng)端口。如果是騰訊云或者阿里云,也要在購買的服務(wù)器管理頁面打開放行端口。

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
查詢端口是否開放

firewall-cmd --query-port=15672/tcp 查看某個(gè)端口firewall-cmd --zone=public --list-ports 查看所有

登陸成功后,即可進(jìn)入以下界面。

我們可在這里為添加用戶和角色

virtualhosts虛擬主機(jī):對(duì)不用戶進(jìn)行隔離,避免相互影響。
此處可以添加

點(diǎn)擊用戶,可以配置其虛擬主機(jī)權(quán)限等。

此處設(shè)置交換機(jī)

2.2 結(jié)構(gòu)和概念

使用消息隊(duì)列中消息的對(duì)象。我們稱之為消費(fèi)者。

在一個(gè)virtualhost下:

2.3 常見的消息模型

基本消息隊(duì)列BasicQueue:最簡單的實(shí)現(xiàn)

工作消息隊(duì)列WorkQueue:在工作者之間分配任務(wù)

發(fā)布訂閱帶有交換機(jī),分為:

Fanout Exchange:廣播,發(fā)布訂閱(publish/subscribe):一次性向讀個(gè)消費(fèi)者發(fā)送消息。

Direct Exchange:路由(Routing):有選擇的接收消息

Topic Exchange:主題 (Topics):根據(jù)主題接收消息。

請(qǐng)求回復(fù)模型(RPC):收到請(qǐng)求然后答復(fù)。

發(fā)布者確認(rèn)模式(Publisher Confirms):會(huì)讓發(fā)布者知道發(fā)送是否成功。

三、SpringAMQP

3.1 用非自動(dòng)裝配的方式使用消息隊(duì)列

需要在項(xiàng)目中引入AMQP,記得加入父類spring-boot-starter-parent

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

引入Junit方便測試

		<dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

測試一下MQ

我們創(chuàng)建如下兩個(gè)子項(xiàng)目。

為test寫一個(gè)測試用例

package com.yjx23332.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;



public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException{
        //1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        //1.2.設(shè)置連接參數(shù)
        factory.setHost("IP");//MQ地址設(shè)置
        factory.setPort(5672);//端口設(shè)置
        factory.setVirtualHost("/");//設(shè)置虛擬主機(jī)
        factory.setUsername("賬號(hào)");
        factory.setPassword("密碼");
        //1.2 建立連接
        Connection connection = factory.newConnection();

        //2.創(chuàng)建通道
        Channel channel = connection.createChannel();

        //3.創(chuàng)建消息隊(duì)列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);

        //4.發(fā)送消息
        String message = "hello,rabbitmq!";
        channel.basicPublish("",queueName,null,message.getBytes());
        System.out.println("已發(fā)送消息:【"+message+"】");

        //5.關(guān)閉通道
        if(channel != null){
            channel.close();
        }
        if(connection != null){
            connection.close();
        }
    }
}

我們在發(fā)送消息前打上斷點(diǎn),用junit運(yùn)行,就可以看到連接創(chuàng)建和通道創(chuàng)建.。

因?yàn)榘l(fā)完就不管了,因此必須打斷點(diǎn),才看得到連接和通道。

完成后可以看到隊(duì)列中

接下來我們處理消息,基本一樣,只需要修改幾個(gè)部分。

注意我們沒有關(guān)閉連接,因?yàn)樵跇I(yè)務(wù)中,要一直處理。

package com.yjx23332.mq.helloworld;

import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class ConsumerTest {
    @Test
    public static void main(String[] args) throws IOException, TimeoutException{
        //1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        //1.2.設(shè)置連接參數(shù)
        factory.setHost("101.43.65.53");//MQ地址設(shè)置
        factory.setPort(5672);//端口設(shè)置
        factory.setVirtualHost("/");//設(shè)置虛擬主機(jī)
        factory.setUsername("yjx23332");
        factory.setPassword("123456");
        //1.2 建立連接
        Connection connection = factory.newConnection();

        //2.創(chuàng)建通道
        Channel channel = connection.createChannel();

        //3.創(chuàng)建消息隊(duì)列
        //為什么這里也要?jiǎng)?chuàng)建?避免消費(fèi)者先執(zhí)行,還沒有隊(duì)列。同時(shí),相同的隊(duì)列創(chuàng)建重復(fù)執(zhí)行沒有影響。
        String queueName = "simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);

        //4.處理消息
        String message = "hello,rabbitmq!";
        //DefaultConsumer 是回調(diào)函數(shù),一旦有消息,異步處理
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws  IOException{
                System.out.println("接收到消息:【"+ new String (body)+"】");
            }
        });
        System.out.println("####################等待接收消息##################");
//
//        //5.關(guān)閉通道
//        if(channel != null){
//            channel.close();
//        }
//        if(connection != null){
//            connection.close();
//        }
    }
}

結(jié)果如下

可以看到,因?yàn)榛卣{(diào)的原因,后面的輸出先執(zhí)行

隊(duì)列中消息處理完畢

3.2 SpringAMQP介紹

AMQP:Advanced Message Queuing Protocol:高級(jí)消息隊(duì)列協(xié)議。于應(yīng)用程序之間傳遞業(yè)務(wù)消息的開放標(biāo)準(zhǔn)。

Spring AMQP:基于AMQP協(xié)議的一套API規(guī)范,提供模板來發(fā)送和接收消息。其中Spring-amqp是基礎(chǔ)抽象,Spring-rabbit是底層的默認(rèn)實(shí)現(xiàn)??蓞⒖?a rel="external nofollow" rel="external nofollow" target="_blank">Spring AMQP官網(wǎng)。

3.3 基礎(chǔ)消息隊(duì)列功能使用

導(dǎo)入依賴

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

為了方便測試,我們引入SpringBoot單元測試

	<dependency>
  		<groupId>org.springframework.boot</groupId>
  		<artifactId>spring-boot-starter-test</artifactId>
  		<scope>test</scope>
	</dependency>
	<dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
    	<scope>test</scope>
    </dependency>

然后準(zhǔn)備一個(gè)yml文件,配置和之前用代碼寫得相似。

spring:
  rabbitmq:
    host: 
    port: 5672
    virtual-host: /
    username: 
    password: 

我們直接走單元測試,這里就不創(chuàng)建一個(gè)隊(duì)列了,直接放消息。

package com.yjx23332.mq.helloworld;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


@SpringBootTest
@RunWith(SpringRunner.class)
public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() throws IOException, TimeoutException{
        rabbitTemplate.convertAndSend("simple.queue","hello,spring amqp");
    }
}

接下來為消費(fèi)者建一個(gè)監(jiān)聽器(記得配置yml文件)

package com.yjx23332.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException{
        System.out.println("spring 消費(fèi)者接收到消息:【"+msg+"】");
    }
}

消息一旦消費(fèi),就會(huì)被移除,Rabbit MQ不存在回溯功能。

3.4 工作隊(duì)列的配置

一個(gè)隊(duì)列綁定多個(gè)消費(fèi)者。

我們準(zhǔn)備發(fā)送50條消息

package com.yjx23332.mq.helloworld;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;



@SpringBootTest
@RunWith(SpringRunner.class)
public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() throws InterruptedException{
        for(int i = 0;i < 50;i++){
            rabbitTemplate.convertAndSend("simple.queue","hello,spring amqp___" + i);
            Thread.sleep(20);
        }
    }
}

修改消費(fèi)者

package com.yjx23332.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException{
        System.out.println("spring 消費(fèi)者接收到消息:【"+msg+"】" + LocalTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage2(String msg) throws InterruptedException{
        System.err.println("spring 消費(fèi)者接收到消息:【"+msg+"】"+ LocalTime.now());
        Thread.sleep(200);
    }
}

從結(jié)果會(huì)發(fā)現(xiàn)處理總時(shí)長超過了1秒達(dá)到了5秒,查看輸出會(huì)發(fā)現(xiàn)消息被平均分配給了兩個(gè)。一個(gè)處理偶數(shù),一個(gè)處理奇數(shù)。但由于處理速度不同,因此處理總時(shí)長超過了1秒。

這里是因?yàn)橄M(fèi)預(yù)取導(dǎo)致的,在執(zhí)行前會(huì)提前把消息從隊(duì)列拿出,然后各自處理。

但我們希望的是,做的快的多做,做的慢的少做。

因此我們可以修改yml文件:

spring:
  rabbitmq:
    host: 
    port: 5672
    virtual-host: /
    username: 
    password: 
    listener:
      simple:
        prefetch:  1 # 每次只能獲取幾條消息,執(zhí)行完了再取下一條,默認(rèn)是無限

重啟后再次執(zhí)行就會(huì)發(fā)現(xiàn)正常了。

3.5 發(fā)布與訂閱模式

我們需要將同一消息發(fā)送給多個(gè)消費(fèi)者。需要加入交換機(jī)來實(shí)現(xiàn)。注意,交換機(jī)只負(fù)責(zé)消費(fèi)路由,但不存儲(chǔ)消息,丟失一概不負(fù)責(zé)。

3.5.1 SpringAMQP交換機(jī)類

3.5.2 Fanout Exchange

我們在consumer服務(wù)中聲明Exchange、Queue、Binding.

package com.yjx23332.mq.confg;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    //聲明FanoutExchange交換機(jī)
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("yjx23332.fanout");
    }
    //聲明一個(gè)隊(duì)列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //綁定隊(duì)列和交換機(jī)
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //聲明第二個(gè)隊(duì)列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    //綁定第二個(gè)隊(duì)列和交換機(jī)
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

運(yùn)行后,會(huì)看到:

修改監(jiān)聽器

package com.yjx23332.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQlistener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("spring 消費(fèi)者接收q1到消息:【"+msg+"】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.err.println("spring 消費(fèi)者接收到q2消息:【"+msg+"】");
    }
}

我們再修改publisher的測試代碼

package com.yjx23332.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testFanoutExchange(){
        String exchangeName = "yjx23332.fanout";
        String message = "hello world!";
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

啟動(dòng)

3.5.3 DirectExchange

將接收到的消息根據(jù)規(guī)則路由到指定的Queue,因此稱為路由模式(routes)。

  • 每一個(gè)Queue都與Exchange設(shè)置一個(gè)BindingKey
  • 發(fā)布者發(fā)送消息時(shí),指定消息的RoutingKey
  • Exchange將消息路由到BindingKey與消息RoutingKey一致的隊(duì)列
  • 一個(gè)隊(duì)列可以指定多個(gè)BindingKey,且隊(duì)列之間的BindingKey可以重復(fù)

由于基于Config創(chuàng)建隊(duì)列交換機(jī)的方式很麻煩,我們用新的方式聲明交換機(jī)、隊(duì)列。

刪除上一節(jié)我們在config中的聲明代碼。

然后在listener中進(jìn)行

package com.yjx23332.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQlistener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(value = "yjx23332.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"} //bindingkey
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("spring 消費(fèi)者接收q1到消息:【"+msg+"】");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(value = "yjx23332.direct" , type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg) {
        System.err.println("spring 消費(fèi)者接收到q2消息:【"+msg+"】");
    }
}

運(yùn)行后,我們可以看到

接下來,我們修改Test代碼

package com.yjx23332.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testDirectExchange(){
        String exchangeName = "yjx23332.direct";
        rabbitTemplate.convertAndSend(exchangeName,"red","hello red");
        rabbitTemplate.convertAndSend(exchangeName,"blue","hello blue");
        rabbitTemplate.convertAndSend(exchangeName,"yellow","hello yellow");
    }
}

3.5.4 TopicExchange

與DirectExchange類似,但是它的routingKey必須是多個(gè)單詞表,并用’.'分割。
當(dāng)隊(duì)列與交換機(jī)綁定時(shí),可以使用通配符。避免當(dāng)bindkey過多導(dǎo)致的麻煩。

#:代表0個(gè)或多個(gè)單詞
*:代指一個(gè)單詞

比如

China.news
Japan.news
就可以用 #.news
同理
China.weather
China.news
就可以用 China.#

我們沿用上一節(jié)的代碼,做一點(diǎn)修改即可

package com.yjx23332.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQlistener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue"),
            exchange = @Exchange(value = "yjx23332.topic",type = ExchangeTypes.TOPIC),
            key = {"China.#"} //bindingkey
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("spring 消費(fèi)者接收q1到消息:【"+msg+"】");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(value = "yjx23332.topic",type = ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public void listenTopicQueue2(String msg) {
        System.err.println("spring 消費(fèi)者接收到q2消息:【"+msg+"】");
    }
}

重啟后,可看到

修改Test代碼

package com.yjx23332.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testDirectExchange(){
        String exchangeName = "yjx23332.topic";
        rabbitTemplate.convertAndSend(exchangeName,"China.news","江蘇地表最高溫度將達(dá)到72攝氏度");
        rabbitTemplate.convertAndSend(exchangeName,"China.weather","未來溫度仍將升高");
        rabbitTemplate.convertAndSend(exchangeName,"Japan.news","安培中槍");
    }
}

3.6 消息轉(zhuǎn)換器

在發(fā)送中,我們接收消息的類型是Object。SpringAMQP會(huì)幫我們序列化后變?yōu)樽止?jié)發(fā)送。
用默認(rèn)JDK的序列化ObjectOutputStream是沒有問題的,但是中間過程是亂碼,我們這里改用JSON方式的序列化,這樣在消息隊(duì)列中查看也是正常的。

默認(rèn)JDK的消息信息:

接下來我們配置消息轉(zhuǎn)換。

我們先在消費(fèi)者聲明一個(gè)queue,并設(shè)置處理方式

package com.yjx23332.mq.listener;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQlistener {
    @RabbitListener(queuesToDeclare = @Queue("object.queue"))
    public void listenObjectQueue(String msg){
        System.out.println("spring 消費(fèi)者接收到Object消息:【"+msg+"】");
    }
}

我們?yōu)榘l(fā)送類引入依賴并編寫配置

		<dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-yaml</artifactId>
        </dependency>

覆蓋默認(rèn)的消息轉(zhuǎn)換。

package com.yjx23332.mq.config;


import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConverterConfig {
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

隨后修改Test

package com.yjx23332.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testObjectQueue(){
        String queueName = "object.queue";
        Map<String,Object> msg = new HashMap<>();
        msg.put("name","yjx23332");
        msg.put("age",21);
        rabbitTemplate.convertAndSend(queueName,msg);
    }
}

結(jié)果如下:

這時(shí)消息不再是亂碼

我們在為消費(fèi)者配置轉(zhuǎn)換,并修改監(jiān)聽器。當(dāng)然,如果我們在兩邊都不配置消息轉(zhuǎn)換器,這里結(jié)果是一樣的。

package com.yjx23332.mq.listener;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
public class MQlistener {
    @RabbitListener(queuesToDeclare = @Queue("object.queue"))
    public void listenObjectQueue(Map<String,Object> msg){
        System.out.println("spring 消費(fèi)者接收到Object消息:【 name = "+msg.get("name")+",age = "+msg.get("age")+"】");
    }

}

結(jié)果如下

參考文獻(xiàn)

[1]Spring AMQP官網(wǎng)
[2]黑馬程序員Java微服務(wù)
[3]RabbitMQ官方文檔

到此這篇關(guān)于 SpringBoot2實(shí)現(xiàn)MessageQueue消息隊(duì)列的文章就介紹到這了,更多相關(guān) SpringBoot2 MessageQueue消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring Security源碼解析之權(quán)限訪問控制是如何做到的

    Spring Security源碼解析之權(quán)限訪問控制是如何做到的

    Spring Security 中對(duì)于權(quán)限控制默認(rèn)已經(jīng)提供了很多了,但是,一個(gè)優(yōu)秀的框架必須具備良好的擴(kuò)展性,下面小編給大家介紹Spring Security源碼解析之權(quán)限訪問控制是如何做到的,感興趣的朋友跟隨小編一起看看吧
    2021-05-05
  • Struts2學(xué)習(xí)筆記(2)-路徑問題解決

    Struts2學(xué)習(xí)筆記(2)-路徑問題解決

    本文主要介紹Struts2的路徑問題,盡量不要使用相對(duì)路徑,使用相對(duì)路徑會(huì)讓路徑問題變得很繁瑣很麻煩,推薦使用絕對(duì)路徑,希望能給大家做一個(gè)參考。
    2016-06-06
  • java實(shí)現(xiàn)停車場管理系統(tǒng)

    java實(shí)現(xiàn)停車場管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)停車場管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-11-11
  • 解決SpringCloud Config結(jié)合github無法讀取配置的問題

    解決SpringCloud Config結(jié)合github無法讀取配置的問題

    這篇文章主要介紹了解決SpringCloud Config結(jié)合github無法讀取配置的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • 詳解@Autowired(required=false)注入注意的問題

    詳解@Autowired(required=false)注入注意的問題

    這篇文章主要介紹了@Autowired(required=false)注入注意的問題,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-04-04
  • 基于JavaSwing+mysql開發(fā)一個(gè)學(xué)生社團(tuán)管理系統(tǒng)設(shè)計(jì)和實(shí)現(xiàn)

    基于JavaSwing+mysql開發(fā)一個(gè)學(xué)生社團(tuán)管理系統(tǒng)設(shè)計(jì)和實(shí)現(xiàn)

    項(xiàng)目使用Java swing+mysql開發(fā),可實(shí)現(xiàn)基礎(chǔ)數(shù)據(jù)維護(hù)、用戶登錄注冊、社團(tuán)信息列表查看、社團(tuán)信息添加、社團(tuán)信息修改、社團(tuán)信息刪除以及退出注銷等功能、界面設(shè)計(jì)比較簡單易學(xué)、適合作為Java課設(shè)設(shè)計(jì)以及學(xué)習(xí)技術(shù)使用,需要的朋友參考下吧
    2021-08-08
  • MyBatis Generator 自定義生成注釋的方法

    MyBatis Generator 自定義生成注釋的方法

    這篇文章主要介紹了MyBatis Generator 自定義生成注釋的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-09-09
  • Spring Security使用數(shù)據(jù)庫認(rèn)證及用戶密碼加密和解密功能

    Spring Security使用數(shù)據(jù)庫認(rèn)證及用戶密碼加密和解密功能

    這篇文章主要介紹了Spring Security使用數(shù)據(jù)庫認(rèn)證及用戶密碼加密和解密,本文通過代碼與截圖的形式給大家介紹的非常詳細(xì),對(duì)大家的工作或?qū)W習(xí)具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-03-03
  • Java中占位符的超全使用方法分享

    Java中占位符的超全使用方法分享

    這篇文章主要為大家詳細(xì)介紹了Java中常見的一些占位符的使用方法,例如%d,%s等,文中的示例代碼簡潔易懂,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)學(xué)習(xí)
    2023-05-05
  • Spring一步到位精通攔截器

    Spring一步到位精通攔截器

    攔截器(Interceptor)是一種動(dòng)態(tài)攔截方法調(diào)用的機(jī)制,在SpringMVC中動(dòng)態(tài)攔截控制器方法的執(zhí)行。本文將詳細(xì)講講SpringMVC中攔截器的概念及入門案例,感興趣的可以嘗試一下
    2023-01-01

最新評(píng)論