RabbitMQ使用案例詳解
初識MQ
同步通訊和異步通訊
同步通訊
微服務間基于Feign的調用就屬于同步方式,存在一些問題
同步調用存在的問題
- 耦合度高:每次加入新需求,都要修改原來的代碼
- 性能下降:調用者需要等待服務提供者響應,如果調用鏈過長則響應時間等于每次調用時間之和
- 浪費資源:調用鏈中的每個服務在等待響應過程中,不能釋放請求占用的資源,高并發(fā)場景下回極度浪費系統(tǒng)資源
- 級聯(lián)失敗:如果服務提供者出現(xiàn)問題,所有調用方都會跟著出問題,如同多米諾骨牌一樣,迅速導致鎮(zhèn)整個微服務群故障
總結
同步調用優(yōu)點:
- 時效性較強,可以立即得到結果
同步調用的問題:
- 耦合度高
- 性能和吞吐能力下降
- 有額外的資源消耗
- 有級聯(lián)失敗問題
異步通訊
異步調用常見實現(xiàn)就是事件驅動模式
事件驅動優(yōu)勢
優(yōu)勢一:服務解耦
優(yōu)勢二:新能提示,吞吐量提高
優(yōu)勢三:服務沒有強依賴,不擔心級聯(lián)失敗問題
優(yōu)勢四:流浪削峰
總結
異步通信的優(yōu)點:
- 耦合度低
- 吞吐量提示
- 故障隔離
- 流量削峰
異步通訊的缺點:
- 依賴于Broker的可靠性,安全性,吞吐能力
- 架構復雜了,業(yè)務沒有明顯的流程線,不好追蹤管理
什么是MQ
MQ(MessageQueue),中文解釋是消息隊列,字面來看就是存放消息的隊列,也就是時間驅動架構中的Broker。
RabbitMQ快速入門
RabbitMQ概述和安裝
RabbitMQ是基于Erlang語言開發(fā)的開源的消息中間件,官網(wǎng)地址:RabbitMQ: easy to use, flexible messaging and streaming | RabbitMQ
安裝RabbitMQ,參考課前資料
Docker運行RabbitMQ:
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
輸入端口,輸入密碼,進入RabbitMQ
RabbitMQ結構和概念
總結
RabbitMQ中的幾個概念:
- channel:操作MQ的工具
- exchange:路由消息到隊列中
- queue:緩存消息
cirtual host:虛擬主機,是對queue,exchange等資源的邏輯分組
常見消息模型
MQ的官方文檔中給出了5個MQ的Demo示例,對應了幾種不同的用法:
- 基本消息隊列(BasicQueue)
- 工作消息隊列(WorkQueue)
- 發(fā)布訂閱(Publish,Subscribe),又根據(jù)交換機類型不同分為三種:
1.Fanout Exchange:廣播
2.Diect Exchange:路由
3.Topic Exchange:主題
不同消息模型
HelloWorld案例
官方的HelloWorld是基于最基礎的消息隊列模型來實現(xiàn)的,只包括三個角色:
- publisher:消息發(fā)布者,將消息發(fā)送到隊列queue
- queue:消息隊列,負責接收并緩存消息
- consumer:訂閱列隊,處理隊列中的消息
案例:完成官方Demo中Hello world案例
實現(xiàn)步驟:
- 導入課前資料中的demo工程
- 運行publisher服務中測試類publisherTest中的測試方法testSendMessage()
- 查看RabbitMQ控制臺的消息
- 啟動consumer服務,查看是否能接收消息
總結
基本消息隊列的消息發(fā)送流程:
- 創(chuàng)建connection
- 創(chuàng)建channel
- 利用channel聲明隊列
- 利用channel向隊列發(fā)送消息
基本消息隊列的消息接收流程:
- 創(chuàng)建connection
- 創(chuàng)建channel
- 利用channel聲明隊列
- 定義consumer的消費行為handleDelivery()
- 利用channel將消費者與隊列綁定
SpringAMQP
什么是AMQP
Advanced Message Queuing Protocol,適用于在應用程序或之間傳遞業(yè)務消息的開放標準。該協(xié)議與語言和平臺無關,更符合微服務中獨立性的要求。
什么是SprngAMQP
Spring AMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息。包含兩部分,其中Spring-AMPQ是基礎抽象,Spring-rabbit是底層的默認實現(xiàn)
SpringAmqp的官方地址:Spring AMQP
Basic Queue 簡單隊列模型
案例:利用SpringAMQP實現(xiàn)Helloworld中的基礎消息隊列功能
發(fā)送消息
流程如下:
1.在父工程中引入spring-adqp的依賴
<!-- AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.在publisher服務中利用RabbitTemplate發(fā)送消息到simple.queue這個隊列
在publisher服務中編寫application.yml,添加mq連接信息:
spring: rabbitmq: host: ip #(填寫自己的ip) port: 5672 username: itcast password: 123321 virtual-host: /
在publisher服務中新建一個測試類,編寫測試方法:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { ? @Autowired private RabbitTemplate rabbitTemplate; ? @Test public void testSendMessage2SimpleQueue(){ String queueName = "simple.queue"; String message = "Hello World!spring amqp!!"; rabbitTemplate.convertAndSend(queueName, message); } }
在consumer服務中編寫消費邏輯,綁定simple.queue這個隊列
String queueName = "simple.queue"; String message = "Hello World!spring amqp!!"; rabbitTemplate.convertAndSend(queueName, message);
總結
什么是AMQP?
應用間消息通信的一種協(xié)議,與語言和平臺無關。
SpringAMQP如何發(fā)送消息?
- 引入amqp的starter依賴
- 配置RabbitMQ地址
- 利用RabbitTemplate的convertAndSend方法
接收消息
在consumer中編寫消費邏輯,監(jiān)聽simple.queue
在consumer服務中編寫application.yml,添加mq連接信息:
spring: rabbitmq: host: ip port: 5672 username: itcast password: 123321 virtual-host: /
在consumer服務中新建一個類,編寫消費邏輯:
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg){ System.out.println("消費者接收到simple.queue的消息:{"+msg+"}"); } }
總結
SpringAMQP如何接收消息?
- 映入amqp的starter依賴
- 配置RabbitMQ地址
- 定義類,添加@Component注解
類中聲明方法,添加@RabbitListener注解,方法參數(shù)就時消息
Work Queue 工作隊列模型
案例:模擬Work Queue,實現(xiàn)一個隊列綁定多個消費者
基本思路如下:
在publisher服務中定義測試方法,每秒產(chǎn)生50條消息,發(fā)送到simple.queue
@Test public void testSendMessage2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "Hello World!spring amqp!!__"; for (int i = 1; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message+i); Thread.sleep(20); } }
在consumer服務中定義兩個消息監(jiān)聽者,都監(jiān)聽simple.queue隊列
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消費者1接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.out.println("消費者2接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now()); Thread.sleep(50); }
消費者 1 每秒處理50條消息,消費者 2 每秒處理10條消息
總結
Work模型的使用:
- 多個消費者綁定到一個隊列,同一條信息只會被一個消費者處理
- 通過設置prefetch來控制消費者預取的消息數(shù)量
發(fā)布,訂閱模型-Fanout
發(fā)布訂閱與之前案例的區(qū)別就是允許將同一消息發(fā)送給多個消費者。實現(xiàn)方式是加入exchange(交換機)。
案例:利用SpringAMQP演示FanoutExchange的使用
實現(xiàn)思路
在consumer服務中,利用代碼聲明隊列,交換機,將兩者綁定
在consumer服務聲明Exchang,Queue,Bingding
SpringAMQP提供了交換機,隊列,綁定關系的API,例如:
@Configuration public class FanoutConfig { //itcast.fanout(交換機) @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //itcast.queue1(隊列一) @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //綁定隊列1到交換機 @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue1). to(fanoutExchange); } //itcast.queue2(隊列二) @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } //綁定隊列2到交換機 @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2). to(fanoutExchange); } }
在consumer服務中,編寫兩個消費者方法,分別監(jiān)聽fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg){ System.out.println("消費者2接收到fanout.queue1的消息:{"+msg+"}"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg){ System.out.println("消費者2接收到fanout.queue2的消息:{"+msg+"}"); }
在publisher中編寫測試方法,向itcast.fanout發(fā)送消息
@Test public void testSendFanoutExchange(){ //交換機名稱 String exchangeName = "itcast.fanout"; //消息 String message = "hello,every one!"; //發(fā)送消息,參數(shù)分別是:交換機名稱,RoutingKey(暫時為空),消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
總結
交換機的作用是什么?
- 接收publisher發(fā)送的消息
- 將消息按照規(guī)則路由到與之綁定的隊列
- 不能緩存消息,路由失敗,消息丟失
FanoutExchange的會將消息路由到每個綁定的隊列
聲明隊列,交換機,綁定關系的Bean是什么?
- Queue
- FanoutExchange
- Binding
發(fā)布,訂閱模型-Direct
發(fā)布訂閱-DirectExchange
Direct Exchange會將接收到的消息根據(jù)規(guī)則路由指定的Queue,因此稱為路由模式(routes)。
案例:利用SpringAMQP演示DirectExchange的使用
實現(xiàn)思路如下:
實現(xiàn)思路如下:
1.利用@RabbitListener聲明Exchange,Queue,RoutingKey
- 在consumer服務中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2
- 并利用@RabbitListener聲明Exchange,Queue,RoutingKey
2.在consumer服務中,編寫兩個消費則方法,分別監(jiān)聽direct.queue1和direct.queue2
@RabbitListener(bindings = @QueueBinding( //隊列 value = @Queue(name = "direct.queue1"), //交換機 exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //邦定機置 key = {"red","blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消費者接收到direct.queue1的消息:{"+msg+"}"); } @RabbitListener(bindings = @QueueBinding( //隊列 value = @Queue(name = "direct.queue2"), //交換機 exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //邦定機置 key = {"red","yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消費者接收到direct.queue2的消息:{"+msg+"}"); }
3.在publisher中編寫測試方法,向itcast.direct發(fā)送消息
@Test public void testSendDirectExchange(){ //交換機名稱 String exchangeName = "itcast.direct"; //消息 String message = "hello,blue one!"; //發(fā)送消息,參數(shù)分別是:交換機名稱,RoutingKey("red"),消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
總結
描述下Direct交換機與Fanout交換機的差異?
- Fanout交換機將消息路由給每一個與之綁定的隊列
- Direct交換機根據(jù)RoutingKey判斷路由給哪個隊列
- 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似
基于@RabbitListener注解聲明隊列和交換機有哪些常見的注解?
- @Queue
- @Exchange
發(fā)布,訂閱模型-Topic
發(fā)布訂閱-TopicExchange
TopicExchange與DirectExchange類似,區(qū)別在于routingKey必須是多個單純的列表,并且以 . 分割。
例如:
china.news:代表有中國新聞消息;
china.weather:代表中國的天氣消息;
japan.news:代表日本新聞;
japan.weather:代表日本的天氣消息;
案例:利用SpringAMQP演示TopicExchange的使用
實現(xiàn)思路如下:
- 利用@RabbitListener聲明Exchange,Queue,RoutingKey
- 在consumer服務中,編寫兩個消費則方法,分別監(jiān)聽topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消費者接收到topic.queue1的消息:{"+msg+"}"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消費者接收到topic.queue2的消息:{"+msg+"}"); }
在publisher中編寫測試方法,向itcast.topic發(fā)送消息
@Test public void testSendTopicExchange(){ //交換機名稱 String exchangeName = "itcast.topic"; //消息 String message = "中國的新聞!"; //發(fā)送消息,參數(shù)分別是:交換機名稱,RoutingKey,消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
總結
描述下Direct交換機與Topic交換機的差異?
自己總結
消息轉換器
測試發(fā)送Object類型消息
說明:在SpringAMQP的發(fā)送方法中,接收消息的類型是Object,也就是說我們可以發(fā)送任意對象類型的消息,SpringAMQP會幫我們序列化為字節(jié)后發(fā)送。(也就是是對象會序列化為字節(jié))
修改序列化(推薦JSON序列化)
發(fā)送消息
我們在publisher服務引入依賴
<!--rabbitmq使用json序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
我們在publisher服務聲明MessageConverter:
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
發(fā)送消息
@Test public void testSendObjectQueue(){ Map<String,Object> msg = new HashMap<>(); msg.put("name","留言"); msg.put("age",21); rabbitTemplate.convertAndSend("object.queue",msg); }
接送消息
我們在consumer服務引入Jackson依賴:
<!--rabbitmq使用json序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
我們在consumer服務定義MessageConverter:
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
然后定義一個消費者,監(jiān)聽object.queue隊列并消費信息:
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String,Object> msg){ System.out.println("消費者接收到object.queue的消息:{"+msg+"}"); }
總結
SpringAMQP中消息的序列化和反序列化是怎么實現(xiàn)的?
- 利用MessageConverter實現(xiàn)的,默認是JDK的序列化
- 注意發(fā)送方與接收方必須使用相同的MessageConverter
到此這篇關于RabbitMQ使用案例詳解的文章就介紹到這了,更多相關RabbitMQ使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
idea創(chuàng)建springboot項目和springcloud項目的詳細教程
這篇文章主要介紹了idea創(chuàng)建springboot項目和springcloud項目方法,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-10-10Spring Boot連接超時導致502錯誤的實戰(zhàn)案例
這篇文章主要給大家介紹了關于Spring Boot連接超時導致502錯誤的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09SpringBoot實現(xiàn)RabbitMQ監(jiān)聽消息的四種方式
本文介紹了在Spring Boot中實現(xiàn)RabbitMQ監(jiān)聽消息的幾種方式,包括使用@RabbitListener注解、MessageListenerAdapter、配置連接工廠和隊列等方式,感興趣的可以了解一下2024-07-07SpringSecurity整合springBoot、redis實現(xiàn)登錄互踢功能
這篇文章主要介紹了SpringSecurity整合springBoot、redis實現(xiàn)登錄互踢,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-05-05