RabbitMQ使用案例詳解
初識(shí)MQ
同步通訊和異步通訊
同步通訊
微服務(wù)間基于Feign的調(diào)用就屬于同步方式,存在一些問題
同步調(diào)用存在的問題
- 耦合度高:每次加入新需求,都要修改原來的代碼
- 性能下降:調(diào)用者需要等待服務(wù)提供者響應(yīng),如果調(diào)用鏈過長(zhǎng)則響應(yīng)時(shí)間等于每次調(diào)用時(shí)間之和
- 浪費(fèi)資源:調(diào)用鏈中的每個(gè)服務(wù)在等待響應(yīng)過程中,不能釋放請(qǐng)求占用的資源,高并發(fā)場(chǎng)景下回極度浪費(fèi)系統(tǒng)資源
- 級(jí)聯(lián)失敗:如果服務(wù)提供者出現(xiàn)問題,所有調(diào)用方都會(huì)跟著出問題,如同多米諾骨牌一樣,迅速導(dǎo)致鎮(zhèn)整個(gè)微服務(wù)群故障
總結(jié)
同步調(diào)用優(yōu)點(diǎn):
- 時(shí)效性較強(qiáng),可以立即得到結(jié)果
同步調(diào)用的問題:
- 耦合度高
- 性能和吞吐能力下降
- 有額外的資源消耗
- 有級(jí)聯(lián)失敗問題
異步通訊
異步調(diào)用常見實(shí)現(xiàn)就是事件驅(qū)動(dòng)模式
事件驅(qū)動(dòng)優(yōu)勢(shì)
優(yōu)勢(shì)一:服務(wù)解耦
優(yōu)勢(shì)二:新能提示,吞吐量提高
優(yōu)勢(shì)三:服務(wù)沒有強(qiáng)依賴,不擔(dān)心級(jí)聯(lián)失敗問題
優(yōu)勢(shì)四:流浪削峰
總結(jié)
異步通信的優(yōu)點(diǎn):
- 耦合度低
- 吞吐量提示
- 故障隔離
- 流量削峰
異步通訊的缺點(diǎn):
- 依賴于Broker的可靠性,安全性,吞吐能力
- 架構(gòu)復(fù)雜了,業(yè)務(wù)沒有明顯的流程線,不好追蹤管理
什么是MQ
MQ(MessageQueue),中文解釋是消息隊(duì)列,字面來看就是存放消息的隊(duì)列,也就是時(shí)間驅(qū)動(dòng)架構(gòu)中的Broker。
RabbitMQ快速入門
RabbitMQ概述和安裝
RabbitMQ是基于Erlang語言開發(fā)的開源的消息中間件,官網(wǎng)地址:RabbitMQ: easy to use, flexible messaging and streaming | RabbitMQ
安裝RabbitMQ,參考課前資料
Docker運(yùn)行RabbitMQ:
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
輸入端口,輸入密碼,進(jìn)入RabbitMQ
RabbitMQ結(jié)構(gòu)和概念
總結(jié)
RabbitMQ中的幾個(gè)概念:
- channel:操作MQ的工具
- exchange:路由消息到隊(duì)列中
- queue:緩存消息
cirtual host:虛擬主機(jī),是對(duì)queue,exchange等資源的邏輯分組
常見消息模型
MQ的官方文檔中給出了5個(gè)MQ的Demo示例,對(duì)應(yīng)了幾種不同的用法:
- 基本消息隊(duì)列(BasicQueue)
- 工作消息隊(duì)列(WorkQueue)
- 發(fā)布訂閱(Publish,Subscribe),又根據(jù)交換機(jī)類型不同分為三種:
1.Fanout Exchange:廣播
2.Diect Exchange:路由
3.Topic Exchange:主題
不同消息模型
HelloWorld案例
官方的HelloWorld是基于最基礎(chǔ)的消息隊(duì)列模型來實(shí)現(xiàn)的,只包括三個(gè)角色:
- publisher:消息發(fā)布者,將消息發(fā)送到隊(duì)列queue
- queue:消息隊(duì)列,負(fù)責(zé)接收并緩存消息
- consumer:訂閱列隊(duì),處理隊(duì)列中的消息
案例:完成官方Demo中Hello world案例
實(shí)現(xiàn)步驟:
- 導(dǎo)入課前資料中的demo工程
- 運(yùn)行publisher服務(wù)中測(cè)試類publisherTest中的測(cè)試方法testSendMessage()
- 查看RabbitMQ控制臺(tái)的消息
- 啟動(dòng)consumer服務(wù),查看是否能接收消息
總結(jié)
基本消息隊(duì)列的消息發(fā)送流程:
- 創(chuàng)建connection
- 創(chuàng)建channel
- 利用channel聲明隊(duì)列
- 利用channel向隊(duì)列發(fā)送消息
基本消息隊(duì)列的消息接收流程:
- 創(chuàng)建connection
- 創(chuàng)建channel
- 利用channel聲明隊(duì)列
- 定義consumer的消費(fèi)行為handleDelivery()
- 利用channel將消費(fèi)者與隊(duì)列綁定
SpringAMQP
什么是AMQP
Advanced Message Queuing Protocol,適用于在應(yīng)用程序或之間傳遞業(yè)務(wù)消息的開放標(biāo)準(zhǔn)。該協(xié)議與語言和平臺(tái)無關(guān),更符合微服務(wù)中獨(dú)立性的要求。
什么是SprngAMQP
Spring AMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息。包含兩部分,其中Spring-AMPQ是基礎(chǔ)抽象,Spring-rabbit是底層的默認(rèn)實(shí)現(xiàn)
SpringAmqp的官方地址:Spring AMQP
Basic Queue 簡(jiǎn)單隊(duì)列模型
案例:利用SpringAMQP實(shí)現(xiàn)Helloworld中的基礎(chǔ)消息隊(duì)列功能
發(fā)送消息
流程如下:
1.在父工程中引入spring-adqp的依賴
<!-- AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.在publisher服務(wù)中利用RabbitTemplate發(fā)送消息到simple.queue這個(gè)隊(duì)列
在publisher服務(wù)中編寫application.yml,添加mq連接信息:
spring: rabbitmq: host: ip #(填寫自己的ip) port: 5672 username: itcast password: 123321 virtual-host: /
在publisher服務(wù)中新建一個(gè)測(cè)試類,編寫測(cè)試方法:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { ? @Autowired private RabbitTemplate rabbitTemplate; ? @Test public void testSendMessage2SimpleQueue(){ String queueName = "simple.queue"; String message = "Hello World!spring amqp!!"; rabbitTemplate.convertAndSend(queueName, message); } }
在consumer服務(wù)中編寫消費(fèi)邏輯,綁定simple.queue這個(gè)隊(duì)列
String queueName = "simple.queue"; String message = "Hello World!spring amqp!!"; rabbitTemplate.convertAndSend(queueName, message);
總結(jié)
什么是AMQP?
應(yīng)用間消息通信的一種協(xié)議,與語言和平臺(tái)無關(guān)。
SpringAMQP如何發(fā)送消息?
- 引入amqp的starter依賴
- 配置RabbitMQ地址
- 利用RabbitTemplate的convertAndSend方法
接收消息
在consumer中編寫消費(fèi)邏輯,監(jiān)聽simple.queue
在consumer服務(wù)中編寫application.yml,添加mq連接信息:
spring: rabbitmq: host: ip port: 5672 username: itcast password: 123321 virtual-host: /
在consumer服務(wù)中新建一個(gè)類,編寫消費(fèi)邏輯:
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg){ System.out.println("消費(fèi)者接收到simple.queue的消息:{"+msg+"}"); } }
總結(jié)
SpringAMQP如何接收消息?
- 映入amqp的starter依賴
- 配置RabbitMQ地址
- 定義類,添加@Component注解
類中聲明方法,添加@RabbitListener注解,方法參數(shù)就時(shí)消息
Work Queue 工作隊(duì)列模型
案例:模擬Work Queue,實(shí)現(xiàn)一個(gè)隊(duì)列綁定多個(gè)消費(fèi)者
基本思路如下:
在publisher服務(wù)中定義測(cè)試方法,每秒產(chǎn)生50條消息,發(fā)送到simple.queue
@Test public void testSendMessage2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "Hello World!spring amqp!!__"; for (int i = 1; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message+i); Thread.sleep(20); } }
在consumer服務(wù)中定義兩個(gè)消息監(jiān)聽者,都監(jiān)聽simple.queue隊(duì)列
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消費(fèi)者1接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.out.println("消費(fèi)者2接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now()); Thread.sleep(50); }
消費(fèi)者 1 每秒處理50條消息,消費(fèi)者 2 每秒處理10條消息
總結(jié)
Work模型的使用:
- 多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條信息只會(huì)被一個(gè)消費(fèi)者處理
- 通過設(shè)置prefetch來控制消費(fèi)者預(yù)取的消息數(shù)量
發(fā)布,訂閱模型-Fanout
發(fā)布訂閱與之前案例的區(qū)別就是允許將同一消息發(fā)送給多個(gè)消費(fèi)者。實(shí)現(xiàn)方式是加入exchange(交換機(jī))。
案例:利用SpringAMQP演示FanoutExchange的使用
實(shí)現(xiàn)思路
在consumer服務(wù)中,利用代碼聲明隊(duì)列,交換機(jī),將兩者綁定
在consumer服務(wù)聲明Exchang,Queue,Bingding
SpringAMQP提供了交換機(jī),隊(duì)列,綁定關(guān)系的API,例如:
@Configuration public class FanoutConfig { //itcast.fanout(交換機(jī)) @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //itcast.queue1(隊(duì)列一) @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //綁定隊(duì)列1到交換機(jī) @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue1). to(fanoutExchange); } //itcast.queue2(隊(duì)列二) @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } //綁定隊(duì)列2到交換機(jī) @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2). to(fanoutExchange); } }
在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg){ System.out.println("消費(fèi)者2接收到fanout.queue1的消息:{"+msg+"}"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg){ System.out.println("消費(fèi)者2接收到fanout.queue2的消息:{"+msg+"}"); }
在publisher中編寫測(cè)試方法,向itcast.fanout發(fā)送消息
@Test public void testSendFanoutExchange(){ //交換機(jī)名稱 String exchangeName = "itcast.fanout"; //消息 String message = "hello,every one!"; //發(fā)送消息,參數(shù)分別是:交換機(jī)名稱,RoutingKey(暫時(shí)為空),消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
總結(jié)
交換機(jī)的作用是什么?
- 接收publisher發(fā)送的消息
- 將消息按照規(guī)則路由到與之綁定的隊(duì)列
- 不能緩存消息,路由失敗,消息丟失
FanoutExchange的會(huì)將消息路由到每個(gè)綁定的隊(duì)列
聲明隊(duì)列,交換機(jī),綁定關(guān)系的Bean是什么?
- Queue
- FanoutExchange
- Binding
發(fā)布,訂閱模型-Direct
發(fā)布訂閱-DirectExchange
Direct Exchange會(huì)將接收到的消息根據(jù)規(guī)則路由指定的Queue,因此稱為路由模式(routes)。
案例:利用SpringAMQP演示DirectExchange的使用
實(shí)現(xiàn)思路如下:
實(shí)現(xiàn)思路如下:
1.利用@RabbitListener聲明Exchange,Queue,RoutingKey
- 在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽direct.queue1和direct.queue2
- 并利用@RabbitListener聲明Exchange,Queue,RoutingKey
2.在consumer服務(wù)中,編寫兩個(gè)消費(fèi)則方法,分別監(jiān)聽direct.queue1和direct.queue2
@RabbitListener(bindings = @QueueBinding( //隊(duì)列 value = @Queue(name = "direct.queue1"), //交換機(jī) exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //邦定機(jī)置 key = {"red","blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消費(fèi)者接收到direct.queue1的消息:{"+msg+"}"); } @RabbitListener(bindings = @QueueBinding( //隊(duì)列 value = @Queue(name = "direct.queue2"), //交換機(jī) exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //邦定機(jī)置 key = {"red","yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消費(fèi)者接收到direct.queue2的消息:{"+msg+"}"); }
3.在publisher中編寫測(cè)試方法,向itcast.direct發(fā)送消息
@Test public void testSendDirectExchange(){ //交換機(jī)名稱 String exchangeName = "itcast.direct"; //消息 String message = "hello,blue one!"; //發(fā)送消息,參數(shù)分別是:交換機(jī)名稱,RoutingKey("red"),消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
總結(jié)
描述下Direct交換機(jī)與Fanout交換機(jī)的差異?
- Fanout交換機(jī)將消息路由給每一個(gè)與之綁定的隊(duì)列
- Direct交換機(jī)根據(jù)RoutingKey判斷路由給哪個(gè)隊(duì)列
- 如果多個(gè)隊(duì)列具有相同的RoutingKey,則與Fanout功能類似
基于@RabbitListener注解聲明隊(duì)列和交換機(jī)有哪些常見的注解?
- @Queue
- @Exchange
發(fā)布,訂閱模型-Topic
發(fā)布訂閱-TopicExchange
TopicExchange與DirectExchange類似,區(qū)別在于routingKey必須是多個(gè)單純的列表,并且以 . 分割。
例如:
china.news:代表有中國新聞消息;
china.weather:代表中國的天氣消息;
japan.news:代表日本新聞;
japan.weather:代表日本的天氣消息;
案例:利用SpringAMQP演示TopicExchange的使用
實(shí)現(xiàn)思路如下:
- 利用@RabbitListener聲明Exchange,Queue,RoutingKey
- 在consumer服務(wù)中,編寫兩個(gè)消費(fèi)則方法,分別監(jiān)聽topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消費(fèi)者接收到topic.queue1的消息:{"+msg+"}"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消費(fèi)者接收到topic.queue2的消息:{"+msg+"}"); }
在publisher中編寫測(cè)試方法,向itcast.topic發(fā)送消息
@Test public void testSendTopicExchange(){ //交換機(jī)名稱 String exchangeName = "itcast.topic"; //消息 String message = "中國的新聞!"; //發(fā)送消息,參數(shù)分別是:交換機(jī)名稱,RoutingKey,消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
總結(jié)
描述下Direct交換機(jī)與Topic交換機(jī)的差異?
自己總結(jié)
消息轉(zhuǎn)換器
測(cè)試發(fā)送Object類型消息
說明:在SpringAMQP的發(fā)送方法中,接收消息的類型是Object,也就是說我們可以發(fā)送任意對(duì)象類型的消息,SpringAMQP會(huì)幫我們序列化為字節(jié)后發(fā)送。(也就是是對(duì)象會(huì)序列化為字節(jié))
修改序列化(推薦JSON序列化)
發(fā)送消息
我們?cè)趐ublisher服務(wù)引入依賴
<!--rabbitmq使用json序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
我們?cè)趐ublisher服務(wù)聲明MessageConverter:
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
發(fā)送消息
@Test public void testSendObjectQueue(){ Map<String,Object> msg = new HashMap<>(); msg.put("name","留言"); msg.put("age",21); rabbitTemplate.convertAndSend("object.queue",msg); }
接送消息
我們?cè)赾onsumer服務(wù)引入Jackson依賴:
<!--rabbitmq使用json序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
我們?cè)赾onsumer服務(wù)定義MessageConverter:
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
然后定義一個(gè)消費(fèi)者,監(jiān)聽object.queue隊(duì)列并消費(fèi)信息:
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String,Object> msg){ System.out.println("消費(fèi)者接收到object.queue的消息:{"+msg+"}"); }
總結(jié)
SpringAMQP中消息的序列化和反序列化是怎么實(shí)現(xiàn)的?
- 利用MessageConverter實(shí)現(xiàn)的,默認(rèn)是JDK的序列化
- 注意發(fā)送方與接收方必須使用相同的MessageConverter
到此這篇關(guān)于RabbitMQ使用案例詳解的文章就介紹到這了,更多相關(guān)RabbitMQ使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea創(chuàng)建springboot項(xiàng)目和springcloud項(xiàng)目的詳細(xì)教程
這篇文章主要介紹了idea創(chuàng)建springboot項(xiàng)目和springcloud項(xiàng)目方法,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10Java?離線中文語音文字識(shí)別功能的實(shí)現(xiàn)代碼
這篇文章主要介紹了Java?離線中文語音文字識(shí)別,本次使用springboot?+maven實(shí)現(xiàn),官方demo為springboot+gradle,結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-07-07Spring Boot連接超時(shí)導(dǎo)致502錯(cuò)誤的實(shí)戰(zhàn)案例
這篇文章主要給大家介紹了關(guān)于Spring Boot連接超時(shí)導(dǎo)致502錯(cuò)誤的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09SpringBoot實(shí)現(xiàn)RabbitMQ監(jiān)聽消息的四種方式
本文介紹了在Spring Boot中實(shí)現(xiàn)RabbitMQ監(jiān)聽消息的幾種方式,包括使用@RabbitListener注解、MessageListenerAdapter、配置連接工廠和隊(duì)列等方式,感興趣的可以了解一下2024-07-07SpringBoot進(jìn)行參數(shù)校驗(yàn)的方法詳解
在日常的接口開發(fā)中,為了防止非法參數(shù)對(duì)業(yè)務(wù)造成影響,經(jīng)常需要對(duì)接口的參數(shù)進(jìn)行校驗(yàn)。本文通過示例詳細(xì)講解了SpringBoot如何進(jìn)行參數(shù)校驗(yàn)的,感興趣的可以學(xué)習(xí)一下2022-04-04SpringSecurity整合springBoot、redis實(shí)現(xiàn)登錄互踢功能
這篇文章主要介紹了SpringSecurity整合springBoot、redis實(shí)現(xiàn)登錄互踢,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-05-05SpringBoot 對(duì)象存儲(chǔ) MinIO的詳細(xì)過程
MinIO 是一個(gè)基于 Go 實(shí)現(xiàn)的高性能、兼容 S3 協(xié)議的對(duì)象存儲(chǔ),它適合存儲(chǔ)海量的非結(jié)構(gòu)化的數(shù)據(jù),這篇文章主要介紹了SpringBoot 對(duì)象存儲(chǔ) MinIO,需要的朋友可以參考下2023-07-07