SpringBoot+RabbitMQ方式收發(fā)消息的實(shí)現(xiàn)示例
本篇會(huì)和SpringBoot做整合,采用自動(dòng)配置的方式進(jìn)行開(kāi)發(fā),我們只需要聲明RabbitMQ地址就可以了,關(guān)于各種創(chuàng)建連接關(guān)閉連接的事都由Spring幫我們了~
交給Spring幫我們管理連接可以讓我們專注于業(yè)務(wù)邏輯,就像聲明式事務(wù)一樣易用,方便又高效。
祝有好收獲,先贊后看,快樂(lè)無(wú)限。
本文代碼:
https://gitee.com/he-erduo/spring-boot-learning-demo
https://github.com/he-erduo/spring-boot-learning-demo
1. 環(huán)境配置
第一節(jié)我們先來(lái)搞一下環(huán)境的配置,上一篇中我們已經(jīng)引入了自動(dòng)配置的包,我們既然使用了自動(dòng)配置的方式,那RabbitMQ的連接信息我們直接放在配置文件中就行了,就像我們需要用到JDBC連接的時(shí)候去配置一下DataSource一樣。
如圖所示,我們只需要指明一下連接的IP+端口號(hào)和用戶名密碼就行了,這里我用的是默認(rèn)的用戶名與密碼,不寫(xiě)的話默認(rèn)也都是guest,端口號(hào)也是默認(rèn)5672。
主要我們需要看一下手動(dòng)確認(rèn)消息的配置,需要配置成manual才是手動(dòng)確認(rèn),日后還會(huì)有其他的配置項(xiàng),眼下我們配置這一個(gè)就可以了。
接下來(lái)我們要配置一個(gè)Queue,上一篇中我們往一個(gè)名叫erduo的隊(duì)列中發(fā)送消息,當(dāng)時(shí)是我們手動(dòng)定義的此隊(duì)列,這里我們也需要手動(dòng)配置,聲明一個(gè)Bean就可以了。
@Configuration public class RabbitmqConfig { @Bean public Queue erduo() { // 其三個(gè)參數(shù):durable exclusive autoDelete // 一般只設(shè)置一下持久化即可 return new Queue("erduo",true); } }
就這么簡(jiǎn)單聲明一下就可以了,當(dāng)然了RabbitMQ畢竟是一個(gè)獨(dú)立的組件,如果你在RabbitMQ中通過(guò)其他方式已經(jīng)創(chuàng)建過(guò)一個(gè)名叫erduo的隊(duì)列了,你這里也可以不聲明,這里起到的一個(gè)效果就是如果你沒(méi)有這個(gè)隊(duì)列,會(huì)按照你聲明的方式幫你創(chuàng)建這個(gè)隊(duì)列。
配置完環(huán)境之后,我們就可以以SpringBoot的方式來(lái)編寫(xiě)生產(chǎn)者和消費(fèi)者了。
2. 生產(chǎn)者與RabbitTemplate
和上一篇的節(jié)奏一樣,我們先來(lái)編寫(xiě)生產(chǎn)者,不過(guò)這次我要引入一個(gè)新的工具:RabbitTemplate。
聽(tīng)它的這個(gè)名字就知道,又是一個(gè)拿來(lái)即用的工具類,Spring家族這點(diǎn)就很舒服,什么東西都給你封裝一遍,讓你用起來(lái)更方便更順手。
RabbitTemplate實(shí)現(xiàn)了標(biāo)準(zhǔn)AmqpTemplate接口,功能大致可以分為發(fā)送消息和接受消息。
我們這里是在生產(chǎn)者中來(lái)用,主要就是使用它的發(fā)送消息功能:send和convertAndSend方法。
// 發(fā)送消息到默認(rèn)的Exchange,使用默認(rèn)的routing key void send(Message message) throws AmqpException; // 使用指定的routing key發(fā)送消息到默認(rèn)的exchange void send(String routingKey, Message message) throws AmqpException; // 使用指定的routing key發(fā)送消息到指定的exchange void send(String exchange, String routingKey, Message message) throws AmqpException;
send方法是發(fā)送byte數(shù)組的數(shù)據(jù)的模式,這里代表消息內(nèi)容的對(duì)象是Message對(duì)象,它的構(gòu)造方法就是傳入byte數(shù)組數(shù)據(jù),所以我們需要把我們的數(shù)據(jù)轉(zhuǎn)成byte數(shù)組然后構(gòu)造成一個(gè)Message對(duì)象再進(jìn)行發(fā)送。
// Object類型,可以傳入POJO void convertAndSend(Object message) throws AmqpException; void convertAndSend(String routingKey, Object message) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
convertAndSend方法是可以傳入POJO對(duì)象作為參數(shù),底層是有一個(gè)MessageConverter幫我們自動(dòng)將數(shù)據(jù)轉(zhuǎn)換成byte類型或String或序列化類型。
所以這里支持的傳入對(duì)象也只有三種:byte類型,String類型和實(shí)現(xiàn)了Serializable接口的POJO。
介紹完了,我們可以看一下代碼:
@Slf4j @Component("rabbitProduce") public class RabbitProduce { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String message = "Hello 我是作者和耳朵,歡迎關(guān)注我。" + LocalDateTime.now().toString(); System.out.println("Message content : " + message); // 指定消息類型 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build(); rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props)); System.out.println("消息發(fā)送完畢。"); } public void convertAndSend() { User user = new User(); System.out.println("Message content : " + user); rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user); System.out.println("消息發(fā)送完畢。"); } }
這里我特意寫(xiě)明了兩個(gè)例子,一個(gè)用來(lái)測(cè)試send,另一個(gè)用來(lái)測(cè)試convertAndSend。
send方法里我們看下來(lái)和之前的代碼是幾乎一樣的,定義一個(gè)消息,然后直接send,但是這個(gè)構(gòu)造消息的構(gòu)造方法可能比我們想的要多一個(gè)參數(shù),我們?cè)瓉?lái)說(shuō)的只要把數(shù)據(jù)轉(zhuǎn)成二進(jìn)制數(shù)組放進(jìn)去即可,現(xiàn)在看來(lái)還要多放一個(gè)參數(shù)了。
MessageProperties,是的我們需要多放一個(gè)MessageProperties對(duì)象,從他的名字我們也可以看出它的功能就是附帶一些參數(shù),但是某些參數(shù)是少不了的,不帶不行。
比如我的代碼這里就是設(shè)置了一下消息的類型,消息的類型有很多種可以是二進(jìn)制類型,文本類型,或者序列化類型,JSON類型,我這里設(shè)置的就是文本類型,指定類型是必須的,也可以為我們拿到消息之后要將消息轉(zhuǎn)換成什么樣的對(duì)象提供一個(gè)參考。
convertAndSend方法就要簡(jiǎn)單太多,這里我放了一個(gè)User對(duì)象拿來(lái)測(cè)試用,直接指定隊(duì)列然后放入這個(gè)對(duì)象即可。
Tips:User必須實(shí)現(xiàn)Serializable接口,不然的話調(diào)用此方法的時(shí)候會(huì)拋出IllegalArgumentException異常。
代碼完成之后我們就可以調(diào)用了,這里我寫(xiě)一個(gè)測(cè)試類進(jìn)行調(diào)用:
@SpringBootTest public class RabbitProduceTest { @Autowired private RabbitProduce rabbitProduce; @Test public void sendSimpleMessage() { rabbitProduce.send(); rabbitProduce.convertAndSend(); } }
效果如下圖~
同時(shí)在控制臺(tái)使用命令rabbitmqctl.bat list_queues查看隊(duì)列-erduo現(xiàn)在的情況:
如此一來(lái),我們的生產(chǎn)者測(cè)試就算完成了,現(xiàn)在消息隊(duì)列里兩條消息了,而且消息類型肯定不一樣,一個(gè)是我們?cè)O(shè)置的文本類型,一個(gè)是自動(dòng)設(shè)置的序列化類型。
3. 消費(fèi)者與RabbitListener
既然隊(duì)列里面已經(jīng)有消息了,接下來(lái)我們就要看我們?cè)撊绾瓮ㄟ^(guò)新的方式拿到消息并消費(fèi)與確認(rèn)了。
消費(fèi)者這里我們要用到@RabbitListener來(lái)幫我們拿到指定隊(duì)列消息,它的用法很簡(jiǎn)單也很復(fù)雜,我們可以先來(lái)說(shuō)簡(jiǎn)單的方式,直接放到方法上,指定監(jiān)聽(tīng)的隊(duì)列就行了。
@Slf4j @Component("rabbitConsumer") public class RabbitConsumer { @RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已確認(rèn)"); } }
這段代碼就代表onMessage方法會(huì)處理erduo(Producer.QUEUE_NAME是常量字符串"erduo")隊(duì)列中的消息。
我們可以看到這個(gè)方法里面有兩個(gè)參數(shù),Message和Channel,如果用不到Channel可以不寫(xiě)此參數(shù),但是Message消息一定是要的,它代表了消息本身。
我們可以想想,我們的程序從RabbitMQ之中拉回一條條消息之后,要以怎么樣的方式展示給我們呢?
沒(méi)錯(cuò),就是封裝為一個(gè)個(gè)Message對(duì)象,這里面放入了一條消息的所有信息,數(shù)據(jù)結(jié)構(gòu)是什么樣一會(huì)我一run你就能看到了。
同時(shí)這里我們使用Channel做一個(gè)消息確認(rèn)的操作,這里的DeliveryTag代表的是這個(gè)消息在隊(duì)列中的序號(hào),這個(gè)信息存放在MessageProperties中。
4. SpringBoot 啟動(dòng)!
編寫(xiě)完生產(chǎn)者和消費(fèi)者,同時(shí)已經(jīng)運(yùn)行過(guò)生產(chǎn)者往消息隊(duì)列里面放了兩條信息,接下來(lái)我們可以直接啟動(dòng)消息,查看消費(fèi)情況:
在我紅色框線標(biāo)記的地方可以看到,因?yàn)槲覀冇辛讼M(fèi)者所以項(xiàng)目啟動(dòng)后先和RabbitMQ建立了一個(gè)連接進(jìn)行監(jiān)聽(tīng)隊(duì)列。
隨后就開(kāi)始消費(fèi)我們隊(duì)列中的兩條消息:
第一條信息是contentType=text/plain類型,所以直接就在控制臺(tái)上打印出了具體內(nèi)容。
第二條信息是contentType=application/x-java-serialized-object,在打印的時(shí)候只打印了一個(gè)內(nèi)存地址+字節(jié)大小。
不管怎么說(shuō),數(shù)據(jù)我們是拿到了,也就是代表我們的消費(fèi)是沒(méi)有問(wèn)題的,同時(shí)也都進(jìn)行了消息確認(rèn)操作,從數(shù)據(jù)上看,整個(gè)消息可以分為兩部分:body和MessageProperties。
我們可以單獨(dú)使用一個(gè)注解拿到這個(gè)body的內(nèi)容 - @Payload
@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(@Payload String body, Channel channel) throws Exception { System.out.println("Message content : " + body); }
也可以單獨(dú)使用一個(gè)注解拿到MessageProperties的headers屬性,headers屬性在截圖里也可以看到,只不過(guò)是個(gè)空的 - @Headers。
@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception { System.out.println("Message content : " + body); System.out.println("Message headers : " + headers); }
這兩個(gè)注解都算是擴(kuò)展知識(shí),我還是更喜歡直接拿到全部,全都要?。?!
上面我們已經(jīng)完成了消息的發(fā)送與消費(fèi),整個(gè)過(guò)程我們可以再次回想一下,一切都和我畫(huà)的這張圖上一樣的軌跡:
只不過(guò)我們一直沒(méi)有指定Exchage一直使用的默認(rèn)路由,希望大家好好記住這張圖。
5. @RabbitListener與@RabbitHandler
下面再來(lái)補(bǔ)一些知識(shí)點(diǎn),有關(guān)@RabbitListener與@RabbitHandler。
@RabbitListener上面我們已經(jīng)簡(jiǎn)單的進(jìn)行了使用,稍微擴(kuò)展一下它其實(shí)是可以監(jiān)聽(tīng)多個(gè)隊(duì)列的,就像這樣:
@RabbitListener(queues = { "queue1", "queue2" }) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false) System.out.println("消息已確認(rèn)"); }
還有一些其他的特性如綁定之類的,這里不再贅述因?yàn)樘簿幋a了一般用不上。
下面來(lái)說(shuō)說(shuō)這節(jié)要主要講的一個(gè)特性:@RabbitListener和@RabbitHandler的搭配使用。
前面我們沒(méi)有提到,@RabbitListener注解其實(shí)是可以注解在類上的,這個(gè)注解在類上標(biāo)志著這個(gè)類監(jiān)聽(tīng)某個(gè)隊(duì)列或某些隊(duì)列。
這兩個(gè)注解的搭配使用就要讓@RabbitListener注解在類上,然后用@RabbitHandler注解在方法上,根據(jù)方法參數(shù)的不同自動(dòng)識(shí)別并去消費(fèi),寫(xiě)個(gè)例子給大家看一看更直觀一些。
@Slf4j @Component("rabbitConsumer") @RabbitListener(queues = Producer.QUEUE_NAME) public class RabbitConsumer { @RabbitHandler public void onMessage(@Payload String message){ System.out.println("Message content : " + message); } @RabbitHandler public void onMessage(@Payload User user) { System.out.println("Message content : " + user); } }
大家可以看看這個(gè)例子,我們先用@RabbitListener監(jiān)聽(tīng)erduo隊(duì)列中的消息,然后使用@RabbitHandler注解了兩個(gè)方法。
第一個(gè)方法的body類型是String類型,這就代表著這個(gè)方法只能處理文本類型的消息。 第二個(gè)方法的body類型是User類型,這就代表著這個(gè)方法只能處理序列化類型且為User類型的消息。
這兩個(gè)方法正好對(duì)應(yīng)著我們第二節(jié)中測(cè)試類會(huì)發(fā)送的兩種消息,所以我們往RabbitMQ中發(fā)送兩條測(cè)試消息,用來(lái)測(cè)試這段代碼,看看效果:
都在控制臺(tái)上如常打印了,如果@RabbitHandler注解的方法中沒(méi)有一個(gè)的類型可以和你消息的類型對(duì)的上,比如消息都是byte數(shù)組類型,這里沒(méi)有對(duì)應(yīng)的方法去接收,系統(tǒng)就會(huì)在控制臺(tái)不斷的報(bào)錯(cuò),如果你出現(xiàn)這個(gè)情況就證明你類型寫(xiě)的不正確。
假設(shè)你的erduo隊(duì)列中會(huì)出現(xiàn)三種類型的消息:byte,文本和序列化,那你就必須要有對(duì)應(yīng)的處理這三種消息的方法,不然消息發(fā)過(guò)來(lái)的時(shí)候就會(huì)因?yàn)闊o(wú)法正確轉(zhuǎn)換而報(bào)錯(cuò)。
而且使用了@RabbitHandler注解之后就不能再和之前一樣使用Message做接收類型。
@RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已確認(rèn)"); }
這樣寫(xiě)的話會(huì)報(bào)類型轉(zhuǎn)換異常的,所以二者選其一。
同時(shí)上文我的@RabbitHandler沒(méi)有進(jìn)行消息確認(rèn),大家可以自己試一下進(jìn)行消息確認(rèn)。
6. 消息的序列化轉(zhuǎn)換
通過(guò)上文我們已經(jīng)知道,能被自動(dòng)轉(zhuǎn)換的對(duì)象只有byte[]、String、java序列化對(duì)象(實(shí)現(xiàn)了Serializable接口的對(duì)象),但是并不是所有的Java對(duì)象都會(huì)去實(shí)現(xiàn)Serializable接口,而且序列化的過(guò)程中使用的是JDK自帶的序列化方法,效率低下。
所以我們更普遍的做法是:使用Jackson先將數(shù)據(jù)轉(zhuǎn)換成JSON格式發(fā)送給RabbitMQ,再接收消息的時(shí)候再用Jackson將數(shù)據(jù)反序列化出來(lái)。
這樣做可以完美解決上面的痛點(diǎn):消息對(duì)象既不必再去實(shí)現(xiàn)Serializable接口,也有比較高的效率(Jackson序列化效率業(yè)界應(yīng)該是最好的了)。
默認(rèn)的消息轉(zhuǎn)換方案是消息轉(zhuǎn)換頂層接口-MessageConverter的一個(gè)子類:SimpleMessageConverter,我們?nèi)绻獡Q到另一個(gè)消息轉(zhuǎn)換器只需要替換掉這個(gè)轉(zhuǎn)換器就行了。
上圖是MessageConverter結(jié)構(gòu)樹(shù)的結(jié)構(gòu)樹(shù),可以看到除了SimpleMessageConverter之外還有一個(gè)Jackson2JsonMessageConverter,我們只需要將它定義為Bean,就可以直接使用這個(gè)轉(zhuǎn)換器了。
@Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(jacksonObjectMapper); }
這樣就可以了,這里的jacksonObjectMapper可以不傳入,但是默認(rèn)的ObjectMapper方案對(duì)JDK8的時(shí)間日期序列化會(huì)不太友好,具體可以參考我的上一篇文章:從LocalDateTime序列化探討全局一致性序列化,總的來(lái)說(shuō)就是定義了自己的ObjectMapper。
同時(shí)為了接下來(lái)測(cè)試方便,我又定義了一個(gè)專門測(cè)試JSON序列化的隊(duì)列:
@Bean public Queue erduoJson() { // 其三個(gè)參數(shù):durable exclusive autoDelete // 一般只設(shè)置一下持久化即可 return new Queue("erduo_json",true); }
如此之后就可以進(jìn)行測(cè)試了,先是生產(chǎn)者代碼:
public void sendObject() { Client client = new Client(); System.out.println("Message content : " + client); rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client); System.out.println("消息發(fā)送完畢。"); }
我又重新定義了一個(gè)Client對(duì)象,它和之前測(cè)試使用的User對(duì)象成員變量都是一樣的,不一樣的是它沒(méi)有實(shí)現(xiàn)Serializable接口。
同時(shí)為了保留之前的測(cè)試代碼,我又新建了一個(gè)RabbitJsonConsumer,用于測(cè)試JSON序列化的相關(guān)消費(fèi)代碼,里面定義了一個(gè)靜態(tài)變量:JSON_QUEUE = "erduo_json";
所以這段代碼是將Client對(duì)象作為消息發(fā)送到"erduo_json"隊(duì)列中去,隨后我們?cè)跍y(cè)試類中run一下進(jìn)行一次發(fā)送。
緊著是消費(fèi)者代碼:
@Slf4j @Component("rabbitJsonConsumer") @RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE) public class RabbitJsonConsumer { public static final String JSON_QUEUE = "erduo_json"; @RabbitHandler public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception { System.out.println("Message content : " + client); System.out.println("Message headers : " + headers); channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); System.out.println("消息已確認(rèn)"); } }
有了上文的經(jīng)驗(yàn)之后,這段代碼理解起來(lái)也是很簡(jiǎn)單了吧,同時(shí)給出了上一節(jié)沒(méi)寫(xiě)的如何在@RabbitHandler模式下進(jìn)行消息簽收。
我們直接來(lái)看看效果:
在打印的Headers里面,往后翻可以看到contentType=application/json,這個(gè)contentType是表明了消息的類型,這里正是說(shuō)明我們新的消息轉(zhuǎn)換器生效了,將所有消息都轉(zhuǎn)換成了JSON類型。
后記
這兩篇講完了RabbitMQ的基本收發(fā)消息,包括手動(dòng)配置和自動(dòng)配置的兩種方式,這些大家仔細(xì)研讀之后應(yīng)該會(huì)對(duì)RabbitMQ收發(fā)消息沒(méi)什么疑問(wèn)了~
不過(guò)我們一直以來(lái)發(fā)消息時(shí)都是使用默認(rèn)的交換機(jī),下篇將會(huì)講述一下RabbitMQ的幾種交換機(jī)類型,以及其使用方式。
到此這篇關(guān)于SpringBoot+RabbitMQ方式收發(fā)消息的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot RabbitMQ 收發(fā)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot整合RabbitMQ消息中間件的使用
- SpringBoot整合rabbitMq自定義消息轉(zhuǎn)換方式
- springboot實(shí)現(xiàn)rabbitmq消息確認(rèn)的示例代碼
- SpringBoot整合消息隊(duì)列RabbitMQ
- springboot整合消息隊(duì)列RabbitMQ
- Springboot?整合?RabbitMQ?消息隊(duì)列?詳情
- SpringBoot整合RabbitMQ實(shí)現(xiàn)消息確認(rèn)機(jī)制
- SpringBoot整合RabbitMQ消息隊(duì)列的完整步驟
- Springboot集成RabbitMQ并驗(yàn)證五種消息模型
相關(guān)文章
Java實(shí)現(xiàn)廣度優(yōu)先遍歷的示例詳解
廣度優(yōu)先遍歷:廣度優(yōu)先遍歷是連通圖的一種遍歷策略,因?yàn)樗乃枷胧菑囊粋€(gè)頂點(diǎn)V0開(kāi)始,輻射狀地優(yōu)先遍歷其周圍較廣的區(qū)域故得名。本文詳細(xì)介紹了Java如何實(shí)現(xiàn)廣度優(yōu)先遍歷,感興趣的小伙伴可以學(xué)習(xí)一下2022-02-02Java的Tomcat和Servlet的運(yùn)行原理詳解
這篇文章主要為大家詳細(xì)介紹了Java的Tomcat和Servlet,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-03-03詳解Spring事件發(fā)布與監(jiān)聽(tīng)機(jī)制
Spring提供了ApplicationContext事件機(jī)制,可以發(fā)布和監(jiān)聽(tīng)事件,這個(gè)特性非常有用。Spring內(nèi)置了一些事件和監(jiān)聽(tīng)器,例如在Spring容器啟動(dòng)前,Spring容器啟動(dòng)后,應(yīng)用啟動(dòng)失敗后等事件發(fā)生后,監(jiān)聽(tīng)在這些事件上的監(jiān)聽(tīng)器會(huì)做出相應(yīng)的響應(yīng)處理2021-06-06關(guān)于Java中阻塞隊(duì)列BlockingQueue的詳解
這篇文章主要介紹了關(guān)于Java中阻塞隊(duì)列BlockingQueue的詳解,BlockingQueue是為了解決多線程中數(shù)據(jù)高效安全傳輸而提出的,從阻塞這個(gè)詞可以看出,在某些情況下對(duì)阻塞隊(duì)列的訪問(wèn)可能會(huì)造成阻塞,需要的朋友可以參考下2023-05-05