SpringBoot整合RabbitMQ實(shí)戰(zhàn)教程附死信交換機(jī)
前言
使用springboot,實(shí)現(xiàn)以下功能,有兩個(gè)隊(duì)列1、2,往里面發(fā)送消息,如果處理失敗發(fā)生異常,可以重試3次,重試3次均失敗,那么就將消息發(fā)送到死信隊(duì)列進(jìn)行統(tǒng)一處理,例如記錄數(shù)據(jù)庫(kù)、報(bào)警等
完整demo項(xiàng)目代碼https://gitee.com/daenmax/rabbit-mq-demo
環(huán)境
Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.雙擊C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat啟動(dòng)MQ服務(wù)
2.然后訪問(wèn)http://localhost:15672/,默認(rèn)賬號(hào)密碼均為guest,
3.手動(dòng)添加一個(gè)虛擬主機(jī)為admin_host,手動(dòng)創(chuàng)建一個(gè)用戶賬號(hào)密碼均為admin
pom.xml
<!-- RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.7.0</version> </dependency>
配置
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin virtual-host: admin_host publisher-confirm-type: correlated publisher-returns: true listener: simple: acknowledge-mode: manual retry: enabled: true #開(kāi)啟失敗重試 max-attempts: 3 #最大重試次數(shù) initial-interval: 1000 #重試間隔時(shí)間 毫秒
配置文件
RabbitConfig
package com.example.rabitmqdemo.mydemo.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * Broker:它提供一種傳輸服務(wù),它的角色就是維護(hù)一條從生產(chǎn)者到消費(fèi)者的路線,保證數(shù)據(jù)能按照指定的方式進(jìn)行傳輸, * Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。 * Queue:消息的載體,每個(gè)消息都會(huì)被投到一個(gè)或多個(gè)隊(duì)列。 * Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來(lái). * Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。 * vhost:虛擬主機(jī),一個(gè)broker里可以有多個(gè)vhost,用作不同用戶的權(quán)限分離。 * Producer:消息生產(chǎn)者,就是投遞消息的程序. * Consumer:消息消費(fèi)者,就是接受消息的程序. * Channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel. */ @Slf4j @Component public class RabbitConfig { //業(yè)務(wù)交換機(jī) public static final String EXCHANGE_PHCP = "phcp"; //業(yè)務(wù)隊(duì)列1 public static final String QUEUE_COMPANY = "company"; //業(yè)務(wù)隊(duì)列1的key public static final String ROUTINGKEY_COMPANY = "companyKey"; //業(yè)務(wù)隊(duì)列2 public static final String QUEUE_PROJECT = "project"; //業(yè)務(wù)隊(duì)列2的key public static final String ROUTINGKEY_PROJECT = "projectKey"; //死信交換機(jī) public static final String EXCHANGE_PHCP_DEAD = "phcp_dead"; //死信隊(duì)列1 public static final String QUEUE_COMPANY_DEAD = "company_dead"; //死信隊(duì)列2 public static final String QUEUE_PROJECT_DEAD = "project_dead"; //死信隊(duì)列1的key public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead"; //死信隊(duì)列2的key public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead"; // /** // * 解決重復(fù)確認(rèn)報(bào)錯(cuò)問(wèn)題,如果沒(méi)有報(bào)錯(cuò)的話,就不用啟用這個(gè) // * // * @param connectionFactory // * @return // */ // @Bean // public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // factory.setConnectionFactory(connectionFactory); // factory.setMessageConverter(new Jackson2JsonMessageConverter()); // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // return factory; // } /** * 聲明業(yè)務(wù)交換機(jī) * 1. 設(shè)置交換機(jī)類(lèi)型 * 2. 將隊(duì)列綁定到交換機(jī) * FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無(wú)routingkey的概念 * HeadersExchange :通過(guò)添加屬性key-value匹配 * DirectExchange:按照routingkey分發(fā)到指定隊(duì)列 * TopicExchange:多關(guān)鍵字匹配 */ @Bean("exchangePhcp") public DirectExchange exchangePhcp() { return new DirectExchange(EXCHANGE_PHCP); } * 聲明死信交換機(jī) @Bean("exchangePhcpDead") public DirectExchange exchangePhcpDead() { return new DirectExchange(EXCHANGE_PHCP_DEAD); * 聲明業(yè)務(wù)隊(duì)列1 * * @return @Bean("queueCompany") public Queue queueCompany() { Map<String,Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD); //綁定該隊(duì)列到死信交換機(jī)的隊(duì)列1 arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD); return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build(); * 聲明業(yè)務(wù)隊(duì)列2 @Bean("queueProject") public Queue queueProject() { //綁定該隊(duì)列到死信交換機(jī)的隊(duì)列2 arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD); return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build(); * 聲明死信隊(duì)列1 @Bean("queueCompanyDead") public Queue queueCompanyDead() { return new Queue(QUEUE_COMPANY_DEAD); * 聲明死信隊(duì)列2 @Bean("queueProjectDead") public Queue queueProjectDead() { return new Queue(QUEUE_PROJECT_DEAD); * 綁定業(yè)務(wù)隊(duì)列1和業(yè)務(wù)交換機(jī) * @param queue * @param directExchange @Bean public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY); * 綁定業(yè)務(wù)隊(duì)列2和業(yè)務(wù)交換機(jī) public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT); * 綁定死信隊(duì)列1和死信交換機(jī) public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD); * 綁定死信隊(duì)列2和死信交換機(jī) public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD); }
生產(chǎn)者
RabbltProducer
package com.example.rabitmqdemo.mydemo.producer; import com.example.rabitmqdemo.mydemo.config.RabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.UUID; @Component @Slf4j public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{ @Resource private RabbitTemplate rabbitTemplate; /** * 初始化消息確認(rèn)函數(shù) */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); rabbitTemplate.setMandatory(true); } /** * 發(fā)送消息服務(wù)器確認(rèn)函數(shù) * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息發(fā)送成功" + correlationData); } else { System.out.println("消息發(fā)送失敗:" + cause); } } /** * 消息發(fā)送失敗,消息回調(diào)函數(shù) * @param returnedMessage */ @Override public void returnedMessage(ReturnedMessage returnedMessage) { String str = new String(returnedMessage.getMessage().getBody()); System.out.println("消息發(fā)送失?。? + str); } /** * 處理消息發(fā)送到隊(duì)列1 * @param str */ public void sendCompany(String str){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData); //也可以用下面的方式 //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData); } /** * 處理消息發(fā)送到隊(duì)列2 * @param str */ public void sendProject(String str){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData); //也可以用下面的方式 //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData); } }
業(yè)務(wù)消費(fèi)者
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * 監(jiān)聽(tīng)業(yè)務(wù)交換機(jī) * @author JeWang */ @Component @Slf4j public class RabbitConsumer { /** * 監(jiān)聽(tīng)業(yè)務(wù)隊(duì)列1 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "company") public void company(Message message, Channel channel) throws IOException { try{ System.out.println("次數(shù)" + message.getMessageProperties().getDeliveryTag()); channel.basicQos(1); Thread.sleep(2000); String s = new String(message.getBody()); log.info("處理消息"+s); //下面兩行是嘗試手動(dòng)拋出異常,用來(lái)測(cè)試重試次數(shù)和發(fā)送到死信交換機(jī) //String str = null; //str.split("1"); //處理成功,確認(rèn)應(yīng)答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("處理消息時(shí)發(fā)生異常:"+e.getMessage()); Boolean redelivered = message.getMessageProperties().getRedelivered(); if(redelivered){ log.error("異常重試次數(shù)已到達(dá)設(shè)置次數(shù),將發(fā)送到死信交換機(jī)"); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); }else { log.error("消息即將返回隊(duì)列處理重試"); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } /** * 監(jiān)聽(tīng)業(yè)務(wù)隊(duì)列2 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "project") public void project(Message message, Channel channel) throws IOException { try{ System.out.println("次數(shù)" + message.getMessageProperties().getDeliveryTag()); channel.basicQos(1); Thread.sleep(2000); String s = new String(message.getBody()); log.info("處理消息"+s); //下面兩行是嘗試手動(dòng)拋出異常,用來(lái)測(cè)試重試次數(shù)和發(fā)送到死信交換機(jī) //String str = null; //str.split("1"); //處理成功,確認(rèn)應(yīng)答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("處理消息時(shí)發(fā)生異常:"+e.getMessage()); Boolean redelivered = message.getMessageProperties().getRedelivered(); if(redelivered){ log.error("異常重試次數(shù)已到達(dá)設(shè)置次數(shù),將發(fā)送到死信交換機(jī)"); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); }else { log.error("消息即將返回隊(duì)列處理重試"); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
死信消費(fèi)者
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * 監(jiān)聽(tīng)死信交換機(jī) * @author JeWang */ @Component @Slf4j public class RabbitConsumerDead { /** * 處理死信隊(duì)列1 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "company_dead") public void company_dead(Message message, Channel channel) throws IOException { try{ channel.basicQos(1); String s = new String(message.getBody()); log.info("處理死信"+s); //在此處記錄到數(shù)據(jù)庫(kù)、報(bào)警之類(lèi)的操作 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("接收異常:"+e.getMessage()); } } /** * 處理死信隊(duì)列2 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "project_dead") public void project_dead(Message message, Channel channel) throws IOException { try{ channel.basicQos(1); String s = new String(message.getBody()); log.info("處理死信"+s); //在此處記錄到數(shù)據(jù)庫(kù)、報(bào)警之類(lèi)的操作 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("接收異常:"+e.getMessage()); } } }
測(cè)試
MqController
package com.example.rabitmqdemo.mydemo.controller; import com.example.rabitmqdemo.mydemo.producer.RabbltProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RequestMapping("/def") @RestController @Slf4j public class MsgController { @Resource private RabbltProducer rabbltProducer; @RequestMapping("/handleCompany") public void handleCompany(@RequestBody String jsonStr){ rabbltProducer.sendCompany(jsonStr); } }
到此這篇關(guān)于SpringBoot整合RabbitMQ實(shí)戰(zhàn)附加死信交換機(jī)的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ死信交換機(jī)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)二維碼QRCode的編碼和解碼與示例解析
本文主要介紹Java實(shí)現(xiàn)二維碼QRCode的編碼和解碼,這里給大家一個(gè)小示例以便理解,有需要的小伙伴可以參考下2016-08-08解決springboot啟動(dòng)報(bào)錯(cuò)bean找不到的問(wèn)題
這篇文章主要介紹了解決springboot啟動(dòng)報(bào)錯(cuò)bean找不到原因,本文給大家分享完美解決方案,通過(guò)圖文相結(jié)合給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-03-03Java構(gòu)建JDBC應(yīng)用程序的實(shí)例操作
在本篇文章里小編給大家整理了一篇關(guān)于Java構(gòu)建JDBC應(yīng)用程序的實(shí)例操作,有興趣的朋友們可以學(xué)習(xí)參考下。2021-03-03java多線程通過(guò)CompletableFuture組裝異步計(jì)算單元
這篇文章主要為大家介紹了java多線程通過(guò)CompletableFuture組裝異步計(jì)算單元,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04java并發(fā)編程中的SynchronousQueue實(shí)現(xiàn)原理解析
這篇文章主要介紹了java并發(fā)編程中的SynchronousQueue實(shí)現(xiàn)原理解析,SynchronousQueue是一個(gè)比較特別的隊(duì)列,此隊(duì)列源碼中充斥著大量的CAS語(yǔ)句,理解起來(lái)是有些難度的,為了方便日后回顧,本篇文章會(huì)以簡(jiǎn)潔的圖形化方式展示該隊(duì)列底層的實(shí)現(xiàn)原理,需要的朋友可以參考下2023-12-12IDEA安裝lombok插件設(shè)置Enable Annotation Processing后編譯依然報(bào)錯(cuò)解決方法
這篇文章主要介紹了IDEA安裝lombok插件設(shè)置Enable Annotation Processing后編譯依然報(bào)錯(cuò)解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04Java mutable對(duì)象和immutable對(duì)象的區(qū)別說(shuō)明
這篇文章主要介紹了Java mutable對(duì)象和immutable對(duì)象的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06java字符串的替換replace、replaceAll、replaceFirst的區(qū)別說(shuō)明
這篇文章主要介紹了java字符串的替換replace、replaceAll、replaceFirst的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03java常用工具類(lèi) Reflect反射工具類(lèi)、String字符串工具類(lèi)
這篇文章主要為大家詳細(xì)介紹了java常用工具類(lèi),包括Reflect反射工具類(lèi)、String字符串工具類(lèi),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-05-05