SpringBoot+RabbitMq具體使用的幾種姿勢(shì)
目前主流的消息中間件有activemq,rabbitmq,rocketmq,kafka,我們要根據(jù)實(shí)際的業(yè)務(wù)場(chǎng)景來選擇一款合適的消息中間件,關(guān)注的主要指標(biāo)有,消息投遞的可靠性,可維護(hù)性,吞吐量以及中間件的特色等重要指標(biāo)來選擇,大數(shù)據(jù)領(lǐng)域肯定是kafka,那么傳統(tǒng)的業(yè)務(wù)場(chǎng)景就是解耦,異步,削峰。那么就在剩下的3款產(chǎn)品中選擇一款,從吞吐量,社區(qū)的活躍度,消息的可靠性出發(fā),一般的中小型公司選擇rabbitmq來說可能更為合適。那么我們就來看看如何使用它吧。
環(huán)境準(zhǔn)備
本案例基于springboot集成rabbitmq,本案例主要側(cè)重要實(shí)際的code,對(duì)于基礎(chǔ)理論知識(shí)請(qǐng)自行百度。
jdk-version:1.8
rabbitmq-version:3.7
springboot-version:2.1.4.RELEASE
pom文件
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml配置文件
spring: rabbitmq: password: guest username: guest port: 5672 addresses: 127.0.0.1 #開啟發(fā)送失敗返回 publisher-returns: true #開啟發(fā)送確認(rèn) publisher-confirms: true listener: simple: #指定最小的消費(fèi)者數(shù)量. concurrency: 2 #指定最大的消費(fèi)者數(shù)量. max-concurrency: 2 #開啟ack acknowledge-mode: auto #開啟ack direct: acknowledge-mode: auto #支持消息的確認(rèn)與返回 template: mandatory: true
配置rabbitMq的姿勢(shì)
姿勢(shì)一
基于javaconfig
package com.lly.order.message; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ClassName RabbitMqConfig * @Description rabbitMq配置類 * @Author lly * @Date 2019-05-13 15:05 * @Version 1.0 **/ @Configuration public class RabbitMqConfig { public final static String DIRECT_QUEUE = "directQueue"; public final static String TOPIC_QUEUE_ONE = "topic_queue_one"; public final static String TOPIC_QUEUE_TWO = "topic_queue_two"; public final static String FANOUT_QUEUE_ONE = "fanout_queue_one"; public final static String FANOUT_QUEUE_TWO = "fanout_queue_two"; public final static String TOPIC_EXCHANGE = "topic_exchange"; public final static String FANOUT_EXCHANGE = "fanout_exchange"; public final static String TOPIC_ROUTINGKEY_ONE = "common_key"; public final static String TOPIC_ROUTINGKEY_TWO = "*.key"; // direct模式隊(duì)列 @Bean public Queue directQueue() { return new Queue(DIRECT_QUEUE, true); } // topic 訂閱者模式隊(duì)列 @Bean public Queue topicQueueOne() { return new Queue(TOPIC_QUEUE_ONE, true); } @Bean public Queue topicQueueTwo() { return new Queue(TOPIC_QUEUE_TWO, true); } // fanout 廣播者模式隊(duì)列 @Bean public Queue fanoutQueueOne() { return new Queue(FANOUT_QUEUE_ONE, true); } @Bean public Queue fanoutQueueTwo() { return new Queue(FANOUT_QUEUE_TWO, true); } // topic 交換器 @Bean public TopicExchange topExchange() { return new TopicExchange(TOPIC_EXCHANGE); } // fanout 交換器 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } // 訂閱者模式綁定 @Bean public Binding topExchangeBingingOne() { return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE); } @Bean public Binding topicExchangeBingingTwo() { return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO); } // 廣播模式綁定 @Bean public Binding fanoutExchangeBingingOne() { return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange()); } @Bean public Binding fanoutExchangeBingingTwo() { return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); } }
姿勢(shì)二
基于注解
package com.lly.order.message; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.LocalTime; import java.util.UUID; /** * @ClassName MQTest * @Description 消息隊(duì)列測(cè)試 * @Author lly * @Date 2019-05-13 10:50 * @Version 1.0 **/ @Component @Slf4j public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private final static String QUEUE = "test_queue"; @Autowired private AmqpTemplate amqpTemplate; @Autowired private RabbitTemplate rabbitTemplate; public MQTest(RabbitTemplate rabbitTemplate) { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } public void sendMq() { rabbitTemplate.convertAndSend("test_queue", "test_queue" + LocalTime.now()); log.info("發(fā)送消息:{}", "test_queue" + LocalTime.now()); } public void sendMqRabbit() { //回調(diào)id CorrelationData cId = new CorrelationData(UUID.randomUUID().toString()); // rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", "廣播者模式測(cè)試",cId); Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, "", "廣播者模式測(cè)試", cId); log.info("發(fā)送消息:{},object:{}", "廣播者模式測(cè)試" + LocalTime.now(), object); } //發(fā)送訂閱者模式 public void sendMqExchange() { CorrelationData cId = new CorrelationData(UUID.randomUUID().toString()); CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString()); log.info("訂閱者模式->發(fā)送消息:routing_key_one"); rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_one", "routing_key_one" + LocalTime.now(), cId); log.info("訂閱者模式->發(fā)送消息routing_key_two"); rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_two", "routing_key_two" + LocalTime.now(), cId01); } //如果不存在,自動(dòng)創(chuàng)建隊(duì)列 @RabbitListener(queuesToDeclare = @Queue("test_queue")) public void receiverMq(String msg) { log.info("接收到隊(duì)列消息:{}", msg); } //如果不存在,自動(dòng)創(chuàng)建隊(duì)列和交換器并且綁定 @RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "topic_queue01", durable = "true"), exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC), key = "routing_key_one")}) public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("接收到topic_routing_key_one消息:{}", msg); //發(fā)生異常 log.error("發(fā)生異常"); int i = 1 / 0; //告訴服務(wù)器收到這條消息 已經(jīng)被我消費(fèi)了 可以在隊(duì)列刪掉 這樣以后就不會(huì)再發(fā)了 否則消息服務(wù)器以為這條消息沒處理掉 后續(xù)還會(huì)在發(fā) channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("接收消息失敗,重新放回隊(duì)列"); //requeu,為true,代表重新放入隊(duì)列多次失敗重新放回會(huì)導(dǎo)致隊(duì)列堵塞或死循環(huán)問題, // 解決方案,剔除此消息,然后記錄到db中去補(bǔ)償 //channel.basicNack(deliveryTag, false, true); //拒絕消息 //channel.basicReject(deliveryTag, true); } } @RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"), exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC), key = "routing_key_two")}) public void receiverMqExchageTwo(String msg) { log.info("接收到topic_routing_key_two消息:{}", msg); } @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE) public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("接收到隊(duì)列fanout_queue_one消息:{}", msg); channel.basicAck(deliveryTag, false); } catch (Exception e) { e.printStackTrace(); //多次失敗重新放回會(huì)導(dǎo)致隊(duì)列堵塞或死循環(huán)問題 丟棄這條消息 // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.error("接收消息失敗"); } } @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO) public void receiverMqFanoutTwo(String msg) { log.info("接收到隊(duì)列fanout_queue_two消息:{}", msg); } /** * @return * @Author lly * @Description 確認(rèn)消息是否發(fā)送到exchange * @Date 2019-05-14 15:36 * @Param [correlationData, ack, cause] **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一標(biāo)識(shí)id:{}", correlationData); log.info("消息確認(rèn)結(jié)果!"); log.error("消息失敗原因,cause:{}", cause); } /** * @return * @Author lly * @Description 消息消費(fèi)發(fā)生異常時(shí)返回 * @Date 2019-05-14 16:22 * @Param [message, replyCode, replyText, exchange, routingKey] **/ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息發(fā)送失敗id:{}", message.getMessageProperties().getCorrelationId()); log.info("消息主體 message : ", message); log.info("消息主體 message : ", replyCode); log.info("描述:" + replyText); log.info("消息使用的交換器 exchange : ", exchange); log.info("消息使用的路由鍵 routing : ", routingKey); } }
rabbitMq消息確認(rèn)的三種方式
# 發(fā)送消息后直接確認(rèn)消息 acknowledge-mode:none # 根據(jù)消息消費(fèi)的情況,智能判定消息的確認(rèn)情況 acknowledge-mode:auto # 手動(dòng)確認(rèn)消息的情況 acknowledge-mode:manual
我們以topic模式來試驗(yàn)下消息的ack
自動(dòng)確認(rèn)消息模式
手動(dòng)確認(rèn)消息模式
然后我們?cè)俅蜗M(fèi)消息,發(fā)現(xiàn)消息是沒有被確認(rèn)的,所以可以被再次消費(fèi)
發(fā)現(xiàn)同樣的消息還是存在的沒有被隊(duì)列刪除,必須手動(dòng)去ack,我們修改隊(duì)列1的手動(dòng)ack看看效果
channel.basicAck(deliveryTag, false);
重啟項(xiàng)目再次消費(fèi)消息
再次查看隊(duì)列里的消息,發(fā)現(xiàn)隊(duì)列01里的消息被刪除了,隊(duì)列02的還是存在。
消費(fèi)消息發(fā)生異常的情況,修改代碼 模擬發(fā)生異常的情況下發(fā)生了什么, 異常發(fā)生了,消息被重放進(jìn)了隊(duì)列
但是會(huì)導(dǎo)致消息不停的循環(huán)消費(fèi),然后失敗,致死循環(huán)調(diào)用大量服務(wù)器資源
所以我們正確的處理方式是,發(fā)生異常,將消息記錄到db,再通過補(bǔ)償機(jī)制來補(bǔ)償消息,或者記錄消息的重復(fù)次數(shù),進(jìn)行重試,超過幾次后再放到db中。
總結(jié)
通過實(shí)際的code我們了解的rabbitmq在項(xiàng)目的具體的整合情況,消息ack的幾種情況,方便在實(shí)際的場(chǎng)景中選擇合適的方案來使用。如有不足,還望不吝賜教。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- RabbitMQ在Spring Boot中的使用步驟
- Springboot RabbitMQ 消息隊(duì)列使用示例詳解
- Spring Boot中RabbitMQ自動(dòng)配置的介紹、原理和使用方法
- 詳解SpringBoot中使用RabbitMQ的RPC功能
- SpringMVC和rabbitmq集成的使用案例
- 詳解Spring Cloud Stream使用延遲消息實(shí)現(xiàn)定時(shí)任務(wù)(RabbitMQ)
- SpringBoot之RabbitMQ的使用方法
- spring boot使用RabbitMQ實(shí)現(xiàn)topic 主題
- Spring3?中?RabbitMQ?的使用與常見場(chǎng)景分析
相關(guān)文章
SpringBoot實(shí)現(xiàn)單元測(cè)試示例詳解
單元測(cè)試(unit testing),是指對(duì)軟件中的最小可測(cè)試單元進(jìn)行檢查和驗(yàn)證。這篇文章主要為大家介紹了C語言實(shí)現(xiàn)單元測(cè)試的方法,需要的可以參考一下2022-11-11SpringBoot之使用Feign實(shí)現(xiàn)微服務(wù)間的交互
這篇文章主要介紹了SpringBoot中使用Feign實(shí)現(xiàn)微服務(wù)間的交互,對(duì)微服務(wù)這方面感興趣的小伙伴可以參考閱讀本文2023-03-03用java的spring實(shí)現(xiàn)一個(gè)簡(jiǎn)單的IOC容器示例代碼
本篇文章主要介紹了用java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的IOC容器示例代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-03-03