SpringBoot+RabbitMq具體使用的幾種姿勢
目前主流的消息中間件有activemq,rabbitmq,rocketmq,kafka,我們要根據(jù)實際的業(yè)務場景來選擇一款合適的消息中間件,關注的主要指標有,消息投遞的可靠性,可維護性,吞吐量以及中間件的特色等重要指標來選擇,大數(shù)據(jù)領域肯定是kafka,那么傳統(tǒng)的業(yè)務場景就是解耦,異步,削峰。那么就在剩下的3款產品中選擇一款,從吞吐量,社區(qū)的活躍度,消息的可靠性出發(fā),一般的中小型公司選擇rabbitmq來說可能更為合適。那么我們就來看看如何使用它吧。
環(huán)境準備
本案例基于springboot集成rabbitmq,本案例主要側重要實際的code,對于基礎理論知識請自行百度。
jdk-version:1.8
rabbitmq-version:3.7
springboot-version:2.1.4.RELEASE
pom文件
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml配置文件
spring: rabbitmq: password: guest username: guest port: 5672 addresses: 127.0.0.1 #開啟發(fā)送失敗返回 publisher-returns: true #開啟發(fā)送確認 publisher-confirms: true listener: simple: #指定最小的消費者數(shù)量. concurrency: 2 #指定最大的消費者數(shù)量. max-concurrency: 2 #開啟ack acknowledge-mode: auto #開啟ack direct: acknowledge-mode: auto #支持消息的確認與返回 template: mandatory: true
配置rabbitMq的姿勢
姿勢一
基于javaconfig
package com.lly.order.message;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName RabbitMqConfig
* @Description rabbitMq配置類
* @Author lly
* @Date 2019-05-13 15:05
* @Version 1.0
**/
@Configuration
public class RabbitMqConfig {
public final static String DIRECT_QUEUE = "directQueue";
public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";
public final static String TOPIC_EXCHANGE = "topic_exchange";
public final static String FANOUT_EXCHANGE = "fanout_exchange";
public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
public final static String TOPIC_ROUTINGKEY_TWO = "*.key";
// direct模式隊列
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE, true);
}
// topic 訂閱者模式隊列
@Bean
public Queue topicQueueOne() {
return new Queue(TOPIC_QUEUE_ONE, true);
}
@Bean
public Queue topicQueueTwo() {
return new Queue(TOPIC_QUEUE_TWO, true);
}
// fanout 廣播者模式隊列
@Bean
public Queue fanoutQueueOne() {
return new Queue(FANOUT_QUEUE_ONE, true);
}
@Bean
public Queue fanoutQueueTwo() {
return new Queue(FANOUT_QUEUE_TWO, true);
}
// topic 交換器
@Bean
public TopicExchange topExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
// fanout 交換器
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
// 訂閱者模式綁定
@Bean
public Binding topExchangeBingingOne() {
return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
}
@Bean
public Binding topicExchangeBingingTwo() {
return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO);
}
// 廣播模式綁定
@Bean
public Binding fanoutExchangeBingingOne() {
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
@Bean
public Binding fanoutExchangeBingingTwo() {
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}
}
姿勢二
基于注解
package com.lly.order.message;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalTime;
import java.util.UUID;
/**
* @ClassName MQTest
* @Description 消息隊列測試
* @Author lly
* @Date 2019-05-13 10:50
* @Version 1.0
**/
@Component
@Slf4j
public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private final static String QUEUE = "test_queue";
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
public MQTest(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
public void sendMq() {
rabbitTemplate.convertAndSend("test_queue", "test_queue" + LocalTime.now());
log.info("發(fā)送消息:{}", "test_queue" + LocalTime.now());
}
public void sendMqRabbit() {
//回調id
CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
// rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", "廣播者模式測試",cId);
Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, "", "廣播者模式測試", cId);
log.info("發(fā)送消息:{},object:{}", "廣播者模式測試" + LocalTime.now(), object);
}
//發(fā)送訂閱者模式
public void sendMqExchange() {
CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
log.info("訂閱者模式->發(fā)送消息:routing_key_one");
rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_one", "routing_key_one" + LocalTime.now(), cId);
log.info("訂閱者模式->發(fā)送消息routing_key_two");
rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_two", "routing_key_two" + LocalTime.now(), cId01);
}
//如果不存在,自動創(chuàng)建隊列
@RabbitListener(queuesToDeclare = @Queue("test_queue"))
public void receiverMq(String msg) {
log.info("接收到隊列消息:{}", msg);
}
//如果不存在,自動創(chuàng)建隊列和交換器并且綁定
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "topic_queue01", durable = "true"),
exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
key = "routing_key_one")})
public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("接收到topic_routing_key_one消息:{}", msg);
//發(fā)生異常
log.error("發(fā)生異常");
int i = 1 / 0;
//告訴服務器收到這條消息 已經被我消費了 可以在隊列刪掉 這樣以后就不會再發(fā)了 否則消息服務器以為這條消息沒處理掉 后續(xù)還會在發(fā)
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("接收消息失敗,重新放回隊列");
//requeu,為true,代表重新放入隊列多次失敗重新放回會導致隊列堵塞或死循環(huán)問題,
// 解決方案,剔除此消息,然后記錄到db中去補償
//channel.basicNack(deliveryTag, false, true);
//拒絕消息
//channel.basicReject(deliveryTag, true);
}
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
key = "routing_key_two")})
public void receiverMqExchageTwo(String msg) {
log.info("接收到topic_routing_key_two消息:{}", msg);
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("接收到隊列fanout_queue_one消息:{}", msg);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
e.printStackTrace();
//多次失敗重新放回會導致隊列堵塞或死循環(huán)問題 丟棄這條消息
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.error("接收消息失敗");
}
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
public void receiverMqFanoutTwo(String msg) {
log.info("接收到隊列fanout_queue_two消息:{}", msg);
}
/**
* @return
* @Author lly
* @Description 確認消息是否發(fā)送到exchange
* @Date 2019-05-14 15:36
* @Param [correlationData, ack, cause]
**/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息唯一標識id:{}", correlationData);
log.info("消息確認結果!");
log.error("消息失敗原因,cause:{}", cause);
}
/**
* @return
* @Author lly
* @Description 消息消費發(fā)生異常時返回
* @Date 2019-05-14 16:22
* @Param [message, replyCode, replyText, exchange, routingKey]
**/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息發(fā)送失敗id:{}", message.getMessageProperties().getCorrelationId());
log.info("消息主體 message : ", message);
log.info("消息主體 message : ", replyCode);
log.info("描述:" + replyText);
log.info("消息使用的交換器 exchange : ", exchange);
log.info("消息使用的路由鍵 routing : ", routingKey);
}
}
rabbitMq消息確認的三種方式
# 發(fā)送消息后直接確認消息 acknowledge-mode:none # 根據(jù)消息消費的情況,智能判定消息的確認情況 acknowledge-mode:auto # 手動確認消息的情況 acknowledge-mode:manual
我們以topic模式來試驗下消息的ack

自動確認消息模式


手動確認消息模式


然后我們再次消費消息,發(fā)現(xiàn)消息是沒有被確認的,所以可以被再次消費

發(fā)現(xiàn)同樣的消息還是存在的沒有被隊列刪除,必須手動去ack,我們修改隊列1的手動ack看看效果
channel.basicAck(deliveryTag, false);
重啟項目再次消費消息

再次查看隊列里的消息,發(fā)現(xiàn)隊列01里的消息被刪除了,隊列02的還是存在。

消費消息發(fā)生異常的情況,修改代碼 模擬發(fā)生異常的情況下發(fā)生了什么, 異常發(fā)生了,消息被重放進了隊列

但是會導致消息不停的循環(huán)消費,然后失敗,致死循環(huán)調用大量服務器資源

所以我們正確的處理方式是,發(fā)生異常,將消息記錄到db,再通過補償機制來補償消息,或者記錄消息的重復次數(shù),進行重試,超過幾次后再放到db中。
總結
通過實際的code我們了解的rabbitmq在項目的具體的整合情況,消息ack的幾種情況,方便在實際的場景中選擇合適的方案來使用。如有不足,還望不吝賜教。希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
SpringBoot多環(huán)境切換的配置實現(xiàn)
在日常的開發(fā)中,一般都會分好幾種環(huán)境,本文就來介紹一下SpringBoot多環(huán)境切換的配置實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-03-03
解決SpringBoot webSocket 資源無法加載、tomcat啟動報錯的問題
這篇文章主要介紹了解決SpringBoot webSocket 資源無法加載、tomcat啟動報錯的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11
SpringBoot自定義starter啟動器的實現(xiàn)思路
這篇文章主要介紹了SpringBoot如何自定義starter啟動器,通過starter的自定義過程,能夠加深大家對SpringBoot自動配置原理的理解,需要的朋友可以參考下2022-10-10
JAVA統(tǒng)計字符串中某個字符出現(xiàn)次數(shù)的方法實現(xiàn)
本文主要介紹了JAVA統(tǒng)計字符串中某個字符出現(xiàn)次數(shù)的方法實現(xiàn),可以循環(huán)使用String的charAt(int index)函數(shù),具有一定的參考價值,感興趣的可以了解一下2023-11-11
springboot集成schedule實現(xiàn)定時任務
在項目開發(fā)過程中,我們經常需要執(zhí)行具有周期性的任務。通過定時任務可以很好的幫助我們實現(xiàn)。本篇文章主要介紹了springboot集成schedule實現(xiàn)定時任務,感興趣的小伙伴們可以參考一下2018-05-05

