欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringAMQP的使用方式案例詳解

 更新時間:2024年01月06日 09:30:08   作者:Winter.169  
這篇文章主要介紹了SpringAMQP的使用方式,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧

MQ介紹

MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅(qū)動架構(gòu)中的Broker。

比較常見的MQ實現(xiàn):

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

幾種常見MQ的對比:

RabbitMQActiveMQRocketMQKafka
公司/社區(qū)RabbitApache阿里Apache
開發(fā)語言ErlangJavaJavaScala&Java
協(xié)議支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定義協(xié)議自定義協(xié)議
可用性一般
單機吞吐量一般非常高
消息延遲微秒級毫秒級毫秒級毫秒以內(nèi)
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延遲:RabbitMQ、Kafka

RabbitMQ消息模型

RabbitMQ官方提供了7個不同的Demo示例,對應(yīng)了不同的消息模型:

RabbitMQ Tutorials — RabbitMQ

SpringAMQP

AMQP,(Advanced Message Queuing Protocol),是用于在應(yīng)用程序之間傳遞業(yè)務(wù)消息的開放標準。該協(xié)議與語言和平臺無關(guān),更符合服務(wù)中獨立性的要求。

SpringAMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息。包含兩部分,其中spring-amqp是基礎(chǔ)抽象,spring-rabbit是底層的默認實現(xiàn)。

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三個功能:

  • 自動聲明隊列、交換機及其綁定關(guān)系
  • 基于注解的監(jiān)聽器模式,異步接收消息
  • 封裝了RabbitTemplate工具,用于發(fā)送消息

1 "HelloWorld"隊列模型

簡單隊列模式的模型圖:

  • publisher:消息發(fā)布者,將消息發(fā)送到隊列queue
  • queue:消息隊列,負責接受并緩存消息
  • consumer:訂閱隊列,處理隊列中的消息

操作步驟:

引入依賴:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主機名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機
    username: ddddddd # 用戶名
    password: 123321 # 密碼

添加配置:

在publisher服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主機名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機
    username: ddddddd # 用戶名
    password: 123321 # 密碼

實現(xiàn)消息發(fā)送

依賴注入RabbitTemplate,調(diào)用convertAndSend方法。

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        // 隊列名稱
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

實現(xiàn)消息接收

在consumer模塊中也需要引入與上面相同的依賴和寫入配置

1,引入依賴

2,寫入配置

3,編寫消息接收類:

兩個注解: @Component            @RabbitListener

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消費者接收到消息:【" + msg + "】");
    }
}

消息一旦消費就會從隊列刪除,RabbitMQ沒有消息回溯功能。

2 "WorkQueues"隊列模型

Work queues,也被稱為(Task queues),任務(wù)模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。

當消息處理比較耗時的時候,可能生產(chǎn)消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。

此時就可以使用work 模型,多個消費者共同處理消息處理,速度就能大大提高了。

  • 多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理
  • 通過設(shè)置prefetch來控制消費者預(yù)取的消息數(shù)量

消息發(fā)送

循環(huán)發(fā)送,模擬大量消息堆積現(xiàn)象。

在publisher服務(wù)中的SpringAmqpTest類中添加一個測試方法:

/**
     * workQueue
     * 向隊列中不停發(fā)送消息,模擬消息堆積。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 隊列名稱
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

消息接收

要模擬多個消費者綁定同一個隊列,我們在consumer服務(wù)的SpringRabbitListener中添加2個新的方法:

默認情況下,消息是平均分配給每個消費者,叫做消息預(yù)取。并沒有考慮到消費者的處理能力。這樣顯然是有問題的。

要解決這個問題:

能者多勞

在消費者中配置yml:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

3 "Publish/Subscribe"隊列模型

發(fā)布訂閱的模型如圖:

發(fā)布訂閱 模式與之前案例的區(qū)別就是允許將同一消息發(fā)給多個消費者。實現(xiàn)方式是加入了exchange(交換機)

可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • Publisher:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊列中,而是發(fā)給Exchange(交換機)
  • Exchange:交換機,圖中的exchange。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
    • Fanout:廣播,將消息交給所有綁定到交換機的隊列
    • Direct:定向,把消息交給符合指定routing key 的隊列
    • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
  • Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
  • Queue:消息隊列也與以前一樣,接收消息、緩存消息。

Exchange(交換機)只負責轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!

三種交換機的使用:

Fanout:廣播

在廣播模式下,消息發(fā)送流程是這樣的:

  • 1) 可以有多個隊列
  • 2) 每個隊列都要綁定到Exchange(交換機)
  • 3) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機,交換機來決定要發(fā)給哪個隊列,生產(chǎn)者無法決定
  • 4) 交換機把消息發(fā)送給綁定過的所有隊列
  • 5) 訂閱隊列的消費者都能拿到消息

下面操作:

  • 創(chuàng)建一個交換機 itcast.fanout,類型是Fanout
  • 創(chuàng)建兩個隊列fanout.queue1和fanout.queue2,綁定到交換機itcast.fanout

步驟:

基于@bean聲明隊列和交換機(下面有基于注解的方式)

Consumer中創(chuàng)建一個配置類,聲明交換機和隊列

package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
    /**
     * 聲明交換機
     * @return Fanout類型交換機
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    /**
     * 第1個隊列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    /**
     * 綁定隊列和交換機
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    /**
     * 第2個隊列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    /**
     * 綁定隊列和交換機
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

消息接收

在consumer服務(wù)的SpringRabbitListener中添加兩個方法,作為消費者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}

消息發(fā)送:

在publisher服務(wù)的SpringAmqpTest類中添加測試方法:

這里convertAndSend的參數(shù)是交換機的名稱而不是隊列名稱了。

@Test
public void testFanoutExchange() {
    // 隊列名稱
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

Direct:定向

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

案例需求如下

  • 利用@RabbitListener聲明Exchange、Queue、RoutingKey
  • 在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2
  • 在publisher中編寫測試方法,向itcast. direct發(fā)送消息

基于注解聲明隊列和交換機

基于@Bean的方式聲明隊列和交換機比較麻煩,Spring還提供了基于注解方式來聲明。

在consumer的SpringRabbitListener中添加兩個消費者,同時基于注解來聲明隊列和交換機:

添加消費者 的同時聲明了隊列和交換機

添加兩個消費者 的同時聲明了隊列和交換機:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消費者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消費者接收到direct.queue2的消息:【" + msg + "】");
}

消息發(fā)送

交換機會根據(jù)RoutingKey去發(fā)送給對應(yīng)的隊列。

@Test
public void testSendDirectExchange() {
    // 交換機名稱
    String exchangeName = "itcast.direct";
    // 消息
    String message = "紅色警報!日本亂排核廢水,導致海洋生物變異,驚現(xiàn)哥斯拉!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

Topic:通配符

`Topic`類型的`Exchange`與`Direct`相比,都是可以根據(jù)`RoutingKey`把消息路由到不同的隊列。只不過`Topic`類型`Exchange`可以讓隊列在綁定`Routing key` 的時候使用通配符!

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

通配符規(guī)則:

  • #:匹配一個或多個詞
  • *:匹配不多不少恰好1個詞

舉例:

  • item.#:能夠匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

圖示:

解釋:

  • Queue1:綁定的是china.# ,因此凡是以 china.開頭的routing key 都會被匹配到。包括china.news和china.weather
  • Queue2:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會被匹配。包括china.news和japan.news

案例需求:

實現(xiàn)思路如下:

  • 并利用@RabbitListener聲明Exchange、Queue、RoutingKey
  • 在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽topic.queue1和topic.queue2
  • 在publisher中編寫測試方法,向itcast. topic發(fā)送消息

消息接收

在consumer服務(wù)的SpringRabbitListener中添加方法:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消費者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消費者接收到topic.queue2的消息:【" + msg + "】");
}

消息發(fā)送

/**
     * topicExchange
     */
@Test
public void testSendTopicExchange() {
    // 交換機名稱
    String exchangeName = "itcast.topic";
    // 消息
    String message = "喜報!孫悟空大戰(zhàn)哥斯拉,勝!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

描述下Direct交換機與Topic交換機的差異?

  • Topic交換機接收的消息RoutingKey必須是多個單詞,以 **.** 分割
  • Topic交換機與隊列綁定時的bindingKey可以指定通配符
  • #:代表0個或多個詞
  • *:代表1個詞

消息轉(zhuǎn)換器

1,測試發(fā)送Object類型消息

在SpringAMQP的發(fā)送方法中,接收消息的類型是Object,也就是說我們可以發(fā)送任意對象類型的消息,SpringAMQP會幫我們序列化為字節(jié)后發(fā)送。

驗證:

在consumer中利用@bean聲明一個隊列:

在Publisher中測試類中發(fā)送一個集合類型的消息:

發(fā)現(xiàn)發(fā)送的消息被序列化了:

解決(加個依賴,加個bean)

配置JSON轉(zhuǎn)換器

顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。

在publisher和consumer兩個服務(wù)中都引入依賴:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

在啟動類中添加一個Bean即可:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

再重新發(fā)送消息,會發(fā)現(xiàn)是json格式了:

2,接收消息:

發(fā)送消息的類型怎么寫,接收消息的類型也怎么寫:

到此這篇關(guān)于SpringAMQP的使用方式的文章就介紹到這了,更多相關(guān)SpringAMQP使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論