詳解SpringBoot中使用RabbitMQ的RPC功能
一、RabbitMQ的RPC簡介
實際業(yè)務(wù)中,有的時候我們還需要等待消費者返回結(jié)果給我們,或者是說我們需要消費者上的一個功能、一個方法或是一個接口返回給我們相應(yīng)的值,而往往大型的系統(tǒng)軟件,生產(chǎn)者跟消費者之間都是相互獨立的兩個系統(tǒng),部署在兩個不同的電腦上,不能通過直接對象.方法的形式獲取想要的結(jié)果,這時候我們就需要用到RPC(Remote Procedure Call)遠程過程調(diào)用方式。
RabbitMQ實現(xiàn)RPC的方式很簡單,生產(chǎn)者發(fā)送一條帶有標簽(消息ID(correlation_id)+回調(diào)隊列名稱)的消息到發(fā)送隊列,消費者(也稱RPC服務(wù)端)從發(fā)送隊列獲取消息并處理業(yè)務(wù),解析標簽的信息將業(yè)務(wù)結(jié)果發(fā)送到指定的回調(diào)隊列,生產(chǎn)者從回調(diào)隊列中根據(jù)標簽的信息獲取發(fā)送消息的返回結(jié)果。
如圖,客戶端C發(fā)送消息,指定消息的ID=rpc_id,回調(diào)響應(yīng)的隊列名稱為rpc_resp,消息從C發(fā)送到rpc_request隊列,服務(wù)端S獲取消息業(yè)務(wù)處理之后,將correlation_id附加到響應(yīng)的結(jié)果發(fā)送到指定的回調(diào)隊列rpc_resp中,客戶端從回調(diào)隊列獲取消息,匹配與發(fā)送消息的correlation_id相同的值為消息應(yīng)答結(jié)果。
二、SpringBoot中使用RabbitMQ的RPC功能
注意:springboot中使用的時候,correlation_id為系統(tǒng)自動生成的,reply_to在加載AmqpTemplate實例的時候設(shè)置的。
實例:
說明:隊列1為發(fā)送隊列,隊列2為返回隊列
1.先配置rabbitmq
package com.ws.common; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* * rabbitMQ配置類 */ @Configuration public class RabbitMQConfig { public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String TOPIC_EXCHANGE = "topic.exchange"; @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Autowired ConnectionFactory connectionFactory; @Bean(name = "connectionFactory") public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //設(shè)置reply_to(返回隊列,只能在這設(shè)置) rabbitTemplate.setReplyAddress(TOPIC_QUEUE2); rabbitTemplate.setReplyTimeout(60000); return rabbitTemplate; } //返回隊列監(jiān)聽器(必須有) @Bean(name="replyMessageListenerContainer") public SimpleMessageListenerContainer createReplyListenerContainer() { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(connectionFactory); listenerContainer.setQueueNames(TOPIC_QUEUE2); listenerContainer.setMessageListener(rabbitTemplate()); return listenerContainer; } //創(chuàng)建隊列 @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2); } //創(chuàng)建交換機 @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } //交換機與隊列進行綁定 @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_QUEUE1); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_QUEUE2); } }
2.發(fā)送消息并同步等待返回值
@Autowired private RabbitTemplate rabbitTemplate; //報文body String sss = "報文的內(nèi)容"; //封裝Message Message msg = this.con(sss); log.info("客戶端--------------------"+msg.toString()); //使用sendAndReceive方法完成rpc調(diào)用 Message message=rabbitTemplate.sendAndReceive(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE1, msg); //提取rpc回應(yīng)內(nèi)容body String response = new String(message.getBody()); log.info("回應(yīng):" + response); log.info("rpc完成---------------------------------------------"); public Message con(String s) { MessageProperties mp = new MessageProperties(); byte[] src = s.getBytes(Charset.forName("UTF-8")); //mp.setReplyTo("adsdas"); 加載AmqpTemplate時設(shè)置,這里設(shè)置沒用 //mp.setCorrelationId("2222"); 系統(tǒng)生成,這里設(shè)置沒用 mp.setContentType("application/json"); mp.setContentEncoding("UTF-8"); mp.setContentLength((long)s.length()); return new Message(src, mp); }
3.寫消費者
package com.ws.listener.mq; import java.nio.charset.Charset; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.ws.common.RabbitMQConfig; import lombok.extern.slf4j.Slf4j; @Slf4j @Component public class Receiver { @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE1) public void receiveTopic1(Message msg) { log.info("隊列1:"+msg.toString()); String msgBody = new String(msg.getBody()); //數(shù)據(jù)處理,返回的Message Message repMsg = con(msgBody+"返回了", msg.getMessageProperties().getCorrelationId()); rabbitTemplate.send(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE2, repMsg); } @RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE2) public void receiveTopic2(Message msg) { log.info("隊列2:"+msg.toString()); } public Message con(String s, String id) { MessageProperties mp = new MessageProperties(); byte[] src = s.getBytes(Charset.forName("UTF-8")); mp.setContentType("application/json"); mp.setContentEncoding("UTF-8"); mp.setCorrelationId(id); return new Message(src, mp); } }
日志打?。?/p>
2019-06-26 17:11:16.607 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 客戶端--------------------(Body:‘報文的內(nèi)容' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=5, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2019-06-26 17:11:16.618 [SimpleAsyncTaskExecutor-1] INFO com.ws.listener.mq.Receiver - 隊列1:(Body:‘報文的內(nèi)容' MessageProperties [headers={}, correlationId=1, replyTo=topic.queue2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=topic.queue1, deliveryTag=1, consumerTag=amq.ctag-8IzlhblYmTebqUYd-uferw, consumerQueue=topic.queue1])
2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 回應(yīng):報文的內(nèi)容返回了
2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - rpc完成---------------------------------------------
到此這篇關(guān)于SpringBoot中使用RabbitMQ的RPC功能的文章就介紹到這了,更多相關(guān)SpringBoot使用RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- RabbitMQ在Spring Boot中的使用步驟
- Springboot RabbitMQ 消息隊列使用示例詳解
- Spring Boot中RabbitMQ自動配置的介紹、原理和使用方法
- SpringMVC和rabbitmq集成的使用案例
- SpringBoot+RabbitMq具體使用的幾種姿勢
- 詳解Spring Cloud Stream使用延遲消息實現(xiàn)定時任務(wù)(RabbitMQ)
- SpringBoot之RabbitMQ的使用方法
- spring boot使用RabbitMQ實現(xiàn)topic 主題
- Spring3?中?RabbitMQ?的使用與常見場景分析
相關(guān)文章
SpringBoot集成minio實現(xiàn)文件上傳和刪除功能
這篇文章主要介紹了SpringBoot集成minio實現(xiàn)文件上傳和刪除功能,詳細介紹每個功能的實現(xiàn)步驟和代碼示例,具有一定的參考價值,感興趣的可以了解一下2023-11-11RocketMQ特性Broker存儲事務(wù)消息實現(xiàn)
這篇文章主要為大家介紹了RocketMQ特性Broker存儲事務(wù)消息實現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08Spring Cloud Stream微服務(wù)消息框架原理及實例解析
這篇文章主要介紹了Spring Cloud Stream微服務(wù)消息框架原理及實例解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-06-06淺談Java數(shù)值類型的轉(zhuǎn)換與強制轉(zhuǎn)換
這篇文章主要介紹了Java數(shù)值類型的轉(zhuǎn)換與強制轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-04-04Java數(shù)據(jù)結(jié)構(gòu)二叉樹難點解析
樹是一種重要的非線性數(shù)據(jù)結(jié)構(gòu),直觀地看,它是數(shù)據(jù)元素(在樹中稱為結(jié)點)按分支關(guān)系組織起來的結(jié)構(gòu),很象自然界中的樹那樣。樹結(jié)構(gòu)在客觀世界中廣泛存在,如人類社會的族譜和各種社會組織機構(gòu)都可用樹形象表示2021-10-10