SpringAMQP的使用方式案例詳解
MQ介紹
MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅(qū)動架構(gòu)中的Broker。
比較常見的MQ實現(xiàn):
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
幾種常見MQ的對比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社區(qū) | Rabbit | Apache | 阿里 | Apache |
開發(fā)語言 | Erlang | Java | Java | Scala&Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協(xié)議 | 自定義協(xié)議 |
可用性 | 高 | 一般 | 高 | 高 |
單機吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內(nèi) |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延遲:RabbitMQ、Kafka
RabbitMQ消息模型
RabbitMQ官方提供了7個不同的Demo示例,對應(yīng)了不同的消息模型:
SpringAMQP
AMQP,(Advanced Message Queuing Protocol),是用于在應(yīng)用程序之間傳遞業(yè)務(wù)消息的開放標準。該協(xié)議與語言和平臺無關(guān),更符合服務(wù)中獨立性的要求。
SpringAMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息。包含兩部分,其中spring-amqp是基礎(chǔ)抽象,spring-rabbit是底層的默認實現(xiàn)。
SpringAmqp的官方地址:Spring AMQP
SpringAMQP提供了三個功能:
- 自動聲明隊列、交換機及其綁定關(guān)系
- 基于注解的監(jiān)聽器模式,異步接收消息
- 封裝了RabbitTemplate工具,用于發(fā)送消息
1 "HelloWorld"隊列模型
簡單隊列模式的模型圖:
- publisher:消息發(fā)布者,將消息發(fā)送到隊列queue
- queue:消息隊列,負責接受并緩存消息
- consumer:訂閱隊列,處理隊列中的消息
操作步驟:
引入依賴:
spring: rabbitmq: host: 192.168.150.101 # 主機名 port: 5672 # 端口 virtual-host: / # 虛擬主機 username: ddddddd # 用戶名 password: 123321 # 密碼
添加配置:
在publisher服務(wù)的application.yml中添加配置:
spring: rabbitmq: host: 192.168.150.101 # 主機名 port: 5672 # 端口 virtual-host: / # 虛擬主機 username: ddddddd # 用戶名 password: 123321 # 密碼
實現(xiàn)消息發(fā)送
依賴注入RabbitTemplate,調(diào)用convertAndSend方法。
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 隊列名稱 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(queueName, message); } }
實現(xiàn)消息接收
在consumer模塊中也需要引入與上面相同的依賴和寫入配置
1,引入依賴
2,寫入配置
3,編寫消息接收類:
兩個注解: @Component @RabbitListener
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消費者接收到消息:【" + msg + "】"); } }
消息一旦消費就會從隊列刪除,RabbitMQ沒有消息回溯功能。
2 "WorkQueues"隊列模型
Work queues,也被稱為(Task queues),任務(wù)模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
當消息處理比較耗時的時候,可能生產(chǎn)消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
此時就可以使用work 模型,多個消費者共同處理消息處理,速度就能大大提高了。
- 多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理
- 通過設(shè)置prefetch來控制消費者預(yù)取的消息數(shù)量
消息發(fā)送
循環(huán)發(fā)送,模擬大量消息堆積現(xiàn)象。
在publisher服務(wù)中的SpringAmqpTest類中添加一個測試方法:
/** * workQueue * 向隊列中不停發(fā)送消息,模擬消息堆積。 */ @Test public void testWorkQueue() throws InterruptedException { // 隊列名稱 String queueName = "simple.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 發(fā)送消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
消息接收
要模擬多個消費者綁定同一個隊列,我們在consumer服務(wù)的SpringRabbitListener中添加2個新的方法:
默認情況下,消息是平均分配給每個消費者,叫做消息預(yù)取。并沒有考慮到消費者的處理能力。這樣顯然是有問題的。
要解決這個問題:
能者多勞
在消費者中配置yml:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息
3 "Publish/Subscribe"隊列模型
發(fā)布訂閱的模型如圖:
發(fā)布訂閱 模式與之前案例的區(qū)別就是允許將同一消息發(fā)給多個消費者。實現(xiàn)方式是加入了exchange(交換機)
可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:
- Publisher:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊列中,而是發(fā)給Exchange(交換機)
- Exchange:交換機,圖中的exchange。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
- Fanout:廣播,將消息交給所有綁定到交換機的隊列
- Direct:定向,把消息交給符合指定routing key 的隊列
- Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
- Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
- Queue:消息隊列也與以前一樣,接收消息、緩存消息。
Exchange(交換機)只負責轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
三種交換機的使用:
Fanout:廣播
在廣播模式下,消息發(fā)送流程是這樣的:
- 1) 可以有多個隊列
- 2) 每個隊列都要綁定到Exchange(交換機)
- 3) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機,交換機來決定要發(fā)給哪個隊列,生產(chǎn)者無法決定
- 4) 交換機把消息發(fā)送給綁定過的所有隊列
- 5) 訂閱隊列的消費者都能拿到消息
下面操作:
- 創(chuàng)建一個交換機 itcast.fanout,類型是Fanout
- 創(chuàng)建兩個隊列fanout.queue1和fanout.queue2,綁定到交換機itcast.fanout
步驟:
基于@bean聲明隊列和交換機(下面有基于注解的方式)
Consumer中創(chuàng)建一個配置類,聲明交換機和隊列:
package cn.itcast.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { /** * 聲明交換機 * @return Fanout類型交換機 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } /** * 第1個隊列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 綁定隊列和交換機 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2個隊列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 綁定隊列和交換機 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
消息接收
在consumer服務(wù)的SpringRabbitListener中添加兩個方法,作為消費者:
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消費者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消費者2接收到Fanout消息:【" + msg + "】"); }
消息發(fā)送:
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
這里convertAndSend的參數(shù)是交換機的名稱而不是隊列名稱了。
@Test public void testFanoutExchange() { // 隊列名稱 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
Direct:定向
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的
Routing Key
進行判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
案例需求如下:
- 利用@RabbitListener聲明Exchange、Queue、RoutingKey
- 在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2
- 在publisher中編寫測試方法,向itcast. direct發(fā)送消息
基于注解聲明隊列和交換機
基于@Bean的方式聲明隊列和交換機比較麻煩,Spring還提供了基于注解方式來聲明。
在consumer的SpringRabbitListener中添加兩個消費者,同時基于注解來聲明隊列和交換機:
添加消費者 的同時聲明了隊列和交換機
添加兩個消費者 的同時聲明了隊列和交換機:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消費者接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消費者接收到direct.queue2的消息:【" + msg + "】"); }
消息發(fā)送
交換機會根據(jù)RoutingKey去發(fā)送給對應(yīng)的隊列。
@Test public void testSendDirectExchange() { // 交換機名稱 String exchangeName = "itcast.direct"; // 消息 String message = "紅色警報!日本亂排核廢水,導致海洋生物變異,驚現(xiàn)哥斯拉!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
Topic:通配符
`Topic`類型的`Exchange`與`Direct`相比,都是可以根據(jù)`RoutingKey`把消息路由到不同的隊列。只不過`Topic`類型`Exchange`可以讓隊列在綁定`Routing key` 的時候使用通配符!
Routingkey
一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規(guī)則:
#
:匹配一個或多個詞*
:匹配不多不少恰好1個詞
舉例:
item.#
:能夠匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
圖示:
解釋:
- Queue1:綁定的是
china.#
,因此凡是以china.
開頭的routing key
都會被匹配到。包括china.news和china.weather - Queue2:綁定的是
#.news
,因此凡是以.news
結(jié)尾的routing key
都會被匹配。包括china.news和japan.news
案例需求:
實現(xiàn)思路如下:
- 并利用@RabbitListener聲明Exchange、Queue、RoutingKey
- 在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽topic.queue1和topic.queue2
- 在publisher中編寫測試方法,向itcast. topic發(fā)送消息
消息接收
在consumer服務(wù)的SpringRabbitListener中添加方法:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消費者接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消費者接收到topic.queue2的消息:【" + msg + "】"); }
消息發(fā)送
/** * topicExchange */ @Test public void testSendTopicExchange() { // 交換機名稱 String exchangeName = "itcast.topic"; // 消息 String message = "喜報!孫悟空大戰(zhàn)哥斯拉,勝!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
描述下Direct交換機與Topic交換機的差異?
- Topic交換機接收的消息RoutingKey必須是多個單詞,以
**.**
分割 - Topic交換機與隊列綁定時的bindingKey可以指定通配符
#
:代表0個或多個詞*
:代表1個詞
消息轉(zhuǎn)換器
1,測試發(fā)送Object類型消息
在SpringAMQP的發(fā)送方法中,接收消息的類型是Object,也就是說我們可以發(fā)送任意對象類型的消息,SpringAMQP會幫我們序列化為字節(jié)后發(fā)送。
驗證:
在consumer中利用@bean聲明一個隊列:
在Publisher中測試類中發(fā)送一個集合類型的消息:
發(fā)現(xiàn)發(fā)送的消息被序列化了:
解決(加個依賴,加個bean):
配置JSON轉(zhuǎn)換器
顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。
在publisher和consumer兩個服務(wù)中都引入依賴:
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
在啟動類中添加一個Bean即可:
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
再重新發(fā)送消息,會發(fā)現(xiàn)是json格式了:
2,接收消息:
發(fā)送消息的類型怎么寫,接收消息的類型也怎么寫:
到此這篇關(guān)于SpringAMQP的使用方式的文章就介紹到這了,更多相關(guān)SpringAMQP使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
new出來的對象中無法使用@autowired進行對象bean注入問題
這篇文章主要介紹了基于new出來的對象中無法使用@autowired進行對象bean注入問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02Java微信公眾平臺開發(fā)(14) 微信web開發(fā)者工具使用
這篇文章主要為大家詳細介紹了Java微信公眾平臺開發(fā)第十四步,微信web開發(fā)者工具的使用方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04SpringBoot @ComponentScan掃描的局限性方式
文章總結(jié):SpringBoot的@ComponentScan注解在掃描組件時存在局限性,只能掃描指定的包及其子包,無法掃描@SpringBootApplication注解自動配置的組件,使用@SpringBootApplication注解可以解決這一問題,它集成了@Configuration、@EnableAutoConfiguration2025-01-01mybatis對象List<String> List<Integer>屬性映射方式
這篇文章主要介紹了mybatis對象List<String> List<Integer>屬性映射方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12Android studio按鈕點擊頁面跳轉(zhuǎn)詳細步驟
在Android應(yīng)用程序中,頁面跳轉(zhuǎn)是非常常見的操作,下面這篇文章主要給大家介紹了關(guān)于Android studio按鈕點擊頁面跳轉(zhuǎn)的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-06-06使用FormData上傳二進制文件、對象、對象數(shù)組方式
這篇文章主要介紹了使用FormData上傳二進制文件、對象、對象數(shù)組方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01