springboot-rabbitmq-reply?消息直接回復(fù)模式詳情
一、使用場景
MQ的作用包括了解耦、異步等。
通常生產(chǎn)者只負(fù)責(zé)生產(chǎn)消息,而不關(guān)心消息誰去獲取,或者消費(fèi)結(jié)果如何;消費(fèi)者只負(fù)責(zé)接收指定的消息進(jìn)行業(yè)務(wù)處理而不關(guān)心消息從哪里來一級回復(fù)業(yè)務(wù)處理情況。但我們項(xiàng)目中有特殊的業(yè)務(wù)存在,我們作為消息生產(chǎn)者在生產(chǎn)消息后需要接收消費(fèi)者的響應(yīng)結(jié)果(說白了就是類似同步調(diào)用 請求響應(yīng)的MQ使用),經(jīng)過研究,MQ的Reply模式(直接回復(fù)模式)就是為此種業(yè)務(wù)模式而產(chǎn)生。
二、Reply實(shí)戰(zhàn)
(1)依賴與YML配置
依賴:
我這里只列出最核心的rabbitMq所需依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置:
無其余特殊配置,因?yàn)閞eply就是rabbitmq的一種交互方式而已
spring: rabbitmq: host: 10.50.40.116 port: 5673 username: admin password: admin
(2)RabbitMq bean配置
package com.leilei.demo; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lei * @create 2022-09-19 21:44 * @desc mq配置 **/ @Configuration public class RabbitMqConfig { @Bean public Queue bizQueue() { return new Queue("bizQueue"); } @Bean public Queue replyQueue() { return new Queue("replyQueue"); } @Bean FanoutExchange bizExchange() { return new FanoutExchange("bizExchange"); } }
業(yè)務(wù)類:
@Data @NoArgsConstructor @AllArgsConstructor public class Vehicle implements Serializable { private Integer id; private String name; }
(3)消息生產(chǎn)端
消息生產(chǎn)端需要做的事情:有生產(chǎn)消息、接受消息消費(fèi)響應(yīng)
(1)生產(chǎn)消息
- 1、生產(chǎn)消息,看業(yè)務(wù)場景選擇是否生成全局唯一自定義的消息ID
- 2、指定消息消費(fèi)后響應(yīng)的隊(duì)列(Reply)
/** * 生產(chǎn)消息 * * @param * @return void * @author lei * @date 2022-09-19 21:59:18 */ public void replySend() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setReplyTo("replyQueue"); //todo 根據(jù)業(yè)務(wù),做一個(gè)嚴(yán)謹(jǐn)?shù)娜治ㄒ籌D,我這里暫時(shí)用UUID String correlationId = UUID.randomUUID().toString(); // 我這里指定了唯一消息ID,看業(yè)務(wù)場景,消費(fèi)者消費(fèi)響應(yīng)后,生產(chǎn)者端可根據(jù)消息ID做業(yè)務(wù)處理 messageProperties.setCorrelationId(correlationId); Vehicle vehicle = new Vehicle(1, "川A0001"); Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties); rabbitTemplate.convertAndSend("bizExchange","",message); System.out.println("生產(chǎn)者發(fā)送消息,自定義消息ID為:" + correlationId); }
(2)接受Reply響應(yīng)
消費(fèi)者消費(fèi)消息后會(huì)將處理結(jié)果進(jìn)行發(fā)送到一個(gè)隊(duì)列,我們讀取這里隊(duì)列就可以拿到對應(yīng)消息的響應(yīng)結(jié)果進(jìn)行業(yè)務(wù)處理了
/** * 接收消息響應(yīng) * * @param message * @return void * @author lei * @date 2022-09-19 21:59:27 */ @RabbitListener(queues = "replyQueue") public void replyResponse(Message message) { String s = new String(message.getBody()); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("收到客戶端響應(yīng)消息ID:" + correlationId); //todo 根據(jù)消息ID可判斷這是哪一個(gè)消息的響應(yīng),我們就可做業(yè)務(wù)操作 System.out.println("收到客戶端響應(yīng)消息:" + s); }
(4)消息消費(fèi)端
消息消費(fèi)端需要做的事有:接受消息然后進(jìn)行業(yè)務(wù)處理、響應(yīng)消息
(1)方法一:sendTo注解+方法返回值
一般來說,我們mq消費(fèi)者監(jiān)聽方法不需要返回值,我們這里使用sendTo注解,則需要將要響應(yīng)的消息定義為返回值,sendTo注解中指定要響應(yīng)到哪個(gè)隊(duì)列
重點(diǎn):
- 1、sendTo注解指定要相應(yīng)的隊(duì)列(注意和生產(chǎn)端保持一致)
- 2、方法定義的返回值內(nèi)容就是要響應(yīng)的消息,最終會(huì)發(fā)送到sendTo注解指定要相應(yīng)的隊(duì)列
- 3、這種方法的缺點(diǎn)是消費(fèi)端的主關(guān)性很高,因?yàn)閟endTo指定的目標(biāo)隊(duì)列可以自己瞎寫,導(dǎo)致生產(chǎn)者端無法正確收到消息響應(yīng),但我相信一般項(xiàng)目中也不會(huì)這么干
/** * 方式1 SendTo指定響應(yīng)隊(duì)列 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues ="bizQueue") @SendTo("replyQueue") public String handleEmailMessage(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客戶端響應(yīng)消息:"+msg+"處理完成!"; } catch (Exception e) { log.error("處理業(yè)務(wù)消息失敗",e); } return null; }
(2)方法二:讀取生產(chǎn)端的消息使用模板發(fā)送
與普通的消費(fèi)者方法一樣,只需要RabbitListener注解監(jiān)聽業(yè)務(wù)隊(duì)列;但還需要根據(jù)消息獲取出ReplyTo地址,然后自己消費(fèi)者方法內(nèi)部手動(dòng)發(fā)送消息
- 1、優(yōu)點(diǎn),更強(qiáng)烈的感受到消息請求 響應(yīng)的交互性,流程看起來更清晰
- 2、缺點(diǎn),代碼不雅
/** * 方式2 message消息獲取內(nèi)部reply rabbitmq手動(dòng)發(fā)送 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues = "bizQueue") public void handleEmailMessage2(Message message) { try { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}", msg); String replyTo = message.getMessageProperties().getReplyTo(); System.out.println("接收到的reply:" + replyTo); rabbitTemplate.convertAndSend(replyTo, "客戶端響應(yīng)消息:" + msg + "處理完成!", x -> { x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId()); return x; }); } catch (Exception e) { log.error("處理業(yè)務(wù)消息失敗",e); } }
(3)方法三:方法返回值
這種方式與1其實(shí)是一致的,但我經(jīng)過測試,因?yàn)樯a(chǎn)者消息指定了ReplyTo的地址,消費(fèi)者端無需自己再次手動(dòng)指定,即生產(chǎn)消息到哪里,是否響應(yīng)以及響應(yīng)消息發(fā)送到哪里全由生產(chǎn)端自己空,消費(fèi)者只需要處理自身業(yè)務(wù)以及返回結(jié)果
/** * 方式三 方法有返回值,返回要響應(yīng)的數(shù)據(jù) (reply 由生產(chǎn)者發(fā)送消息時(shí)指定,消費(fèi)者不做任何處理) * * @param message * @return String * @author lei * @date 2022-09-19 23:17:47 */ @RabbitListener(queues ="bizQueue") public String handleEmailMessage3(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客戶端響應(yīng)消息:"+msg+"處理完成!"; } catch (Exception e) { log.error("處理業(yè)務(wù)消息失敗",e); } return null; }
(4)測試
生產(chǎn)消息:
消費(fèi)消息與響應(yīng):
收到的響應(yīng):
鏈路:
如此,MQ版本的請求響應(yīng)模式就完成了,其實(shí)很多大佬使用MQ來實(shí)現(xiàn)RPC就是用的ReplyTo啦!
到此這篇關(guān)于springboot-rabbitmq-reply 消息直接回復(fù)模式詳情的文章就介紹到這了,更多相關(guān)springboot-rabbitmq-reply 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決SpringCloud Gateway配置自定義路由404的坑
這篇文章主要介紹了解決SpringCloud Gateway配置自定義路由404的坑,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09Java后臺(tái)Controller實(shí)現(xiàn)文件下載操作
這篇文章主要介紹了Java后臺(tái)Controller實(shí)現(xiàn)文件下載操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10shuffle的關(guān)鍵階段sort(Map端和Reduce端)源碼分析
今天小編就為大家分享一篇關(guān)于shuffle的關(guān)鍵階段sort(Map端和Reduce端)源碼分析,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-01-01Java?SE判斷兩個(gè)文件內(nèi)容是否相同的多種方法代碼
昨天因?yàn)橐獛蛶熜值拿λ钥戳艘幌氯绾闻袛鄡蓚€(gè)文件內(nèi)容是否相同,這里給大家總結(jié)下,這篇文章主要給大家介紹了關(guān)于Java?SE判斷兩個(gè)文件內(nèi)容是否相同的多種方法,需要的朋友可以參考下2023-11-11spring+Jpa多數(shù)據(jù)源配置的方法示例
這篇文章主要介紹了spring+Jpa多數(shù)據(jù)源配置的方法示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-08-08Java Comparable及Comparator接口區(qū)別詳解
這篇文章主要介紹了Java Comparable及Comparator接口區(qū)別詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07Java中TreeSet、HashSet、Collection重寫比較器的實(shí)現(xiàn)
比較器是一種可以對集合或數(shù)組中的元素按照自定義的方式進(jìn)行排序的對象,本文主要介紹了Java中TreeSet、HashSet、Collection重寫比較器的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2023-08-08Java底層基于鏈表實(shí)現(xiàn)集合和映射--集合Set操作詳解
這篇文章主要介紹了Java底層基于鏈表實(shí)現(xiàn)集合和映射集合Set操作,結(jié)合實(shí)例形式詳細(xì)分析了Java使用鏈表實(shí)現(xiàn)集合和映射相關(guān)原理、操作技巧與注意事項(xiàng),需要的朋友可以參考下2020-03-03