springboot rabbitmq整合rabbitmq之消息持久化存儲問題
rabbitmq消息持久化存儲包含三個(gè)方面
- 1、exchange的持久化
- 2、queue的持久化
- 3、message的持久化
exchange的持久化
在申明exchange的時(shí)候,有個(gè)參數(shù):durable。
當(dāng)該參數(shù)為true,則對該exchange做持久化,重啟rabbitmq服務(wù)器,該exchange不會消失。
durable的默認(rèn)值為true
public class DirectExchange extends AbstractExchange {
public static final DirectExchange DEFAULT = new DirectExchange("");
public DirectExchange(String name) {
super(name);
}
public DirectExchange(String name, boolean durable, boolean autoDelete) {
super(name, durable, autoDelete);
}
public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
super(name, durable, autoDelete, arguments);
}
public final String getType() {
return "direct";
}
}public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
private final String name;
private final boolean durable;
private final boolean autoDelete;
private final Map<String, Object> arguments;
private volatile boolean delayed;
private boolean internal;
public AbstractExchange(String name) {
this(name, true, false);
}
public AbstractExchange(String name, boolean durable, boolean autoDelete) {
this(name, durable, autoDelete, (Map)null);
}
public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
this.name = name;
this.durable = durable;
this.autoDelete = autoDelete;
if (arguments != null) {
this.arguments = arguments;
} else {
this.arguments = new HashMap();
}
}queue的持久化
申明隊(duì)列時(shí)也有個(gè)參數(shù):durable。
當(dāng)該參數(shù)為true,則對該queue做持久化,重啟rabbitmq服務(wù)器,該queue不會消失。
durable的默認(rèn)值為true
public Queue(String name) {
this(name, true, false, false);
}
public Queue(String name, boolean durable) {
this(name, durable, false, false, (Map)null);
}
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, (Map)null);
}
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
Assert.notNull(name, "'name' cannot be null");
this.name = name;
this.actualName = StringUtils.hasText(name) ? name : Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration";
this.durable = durable;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
this.arguments = (Map)(arguments != null ? arguments : new HashMap());
}message的持久化
前面我們已經(jīng)講到exchange與queue的持久化,那么message如何持久化呢?
我們在使用rabbit-client做消息持久化時(shí),設(shè)置了BasicProperties的deliveryMode為2,做消息的持久化。
AMQP.BasicProperties properties = new AMQP.BasicProperties.
Builder().
deliveryMode(2).
build();
channel.basicPublish("ex.pc", "key.pc", properties, "hello world".getBytes());那么整合了spring boot,使用RabbitTemplate如何做持久化?
首先,我們來到經(jīng)常的使用的消息發(fā)送方法:RabbitTemplate類下的convertAndSend
@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}然后調(diào)用了該類下的重載方法:convertAndSend。該方法中將object 轉(zhuǎn)換成了message
@Override
public void convertAndSend(String exchange, String routingKey, final Object object,
@Nullable CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}在做消息轉(zhuǎn)換的時(shí)候,我們注意到,傳入了一個(gè)MessageProperties對象
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}在MessageProperties中,有個(gè)deliveryMode屬性,該屬性默認(rèn)值為:MessageDeliveryMode.PERSISTENT(持久化的)
public MessageProperties() {
this.deliveryMode = DEFAULT_DELIVERY_MODE;
this.priority = DEFAULT_PRIORITY;
}
static {
DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
DEFAULT_PRIORITY = 0;
}消息轉(zhuǎn)換完成后,調(diào)用時(shí)同類方法的send方法
@Override
public void send(final String exchange, final String routingKey,
final Message message, @Nullable final CorrelationData correlationData)
throws AmqpException {
execute(channel -> {
doSend(channel, exchange, routingKey, message,
(RabbitTemplate.this.returnCallback != null
|| (correlationData != null && StringUtils.hasText(correlationData.getId())))
&& RabbitTemplate.this.mandatoryExpression.getValue(
RabbitTemplate.this.evaluationContext, message, Boolean.class),
correlationData);
return null;
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}該方法又調(diào)用了doSend方法
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
boolean mandatory, @Nullable CorrelationData correlationData)
throws Exception { // NOSONAR TODO: change to IOException in 2.2.
String exch = exchangeArg;
String rKey = routingKeyArg;
if (exch == null) {
exch = this.exchange;
}
if (rKey == null) {
rKey = this.routingKey;
}
if (logger.isDebugEnabled()) {
logger.debug("Publishing message " + message
+ "on exchange [" + exch + "], routingKey = [" + rKey + "]");
}
Message messageToUse = message;
MessageProperties messageProperties = messageToUse.getMessageProperties();
if (mandatory) {
messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
}
if (this.beforePublishPostProcessors != null) {
for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
messageToUse = processor.postProcessMessage(messageToUse, correlationData);
}
}
setupConfirm(channel, messageToUse, correlationData);
if (this.userIdExpression != null && messageProperties.getUserId() == null) {
String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
if (userId != null) {
messageProperties.setUserId(userId);
}
}
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
// Check if commit needed
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
}在該方法中我們終于看到了發(fā)送消息到rabbitmq的操作:sendToRabbit。
該方法將MessageProperties對象轉(zhuǎn)換成了BasicProperties。
至此,我們終于了解了,spring rabbit 中如何實(shí)現(xiàn)messge的持久化。
默認(rèn)的message就是持久化的
protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
Message message) throws IOException {
BasicProperties convertedMessageProperties = this.messagePropertiesConverter
.fromMessageProperties(message.getMessageProperties(), this.encoding);
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}如何改變message的持久化屬性?
根據(jù)上面的源碼分析,spring中默認(rèn)的message就是持久化的,如何改變持久化屬性?
1、使用send方法,發(fā)送message。設(shè)置message中MessageProperties的屬性deliveryMode
2、自定義MessageConverter,在消息轉(zhuǎn)換時(shí),設(shè)置MessageProperties的屬性deliveryMode
3、自定MessagePropertiesConverter,在MessageProperties對象轉(zhuǎn)換成BasicProperties時(shí),設(shè)置deliveryMode
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java中l(wèi)ength,length(),size()詳解及區(qū)別
這篇文章主要介紹了Java中l(wèi)ength,length(),size()詳解及區(qū)別的相關(guān)資料,需要的朋友可以參考下2016-11-11
Spring Boot整合JWT的實(shí)現(xiàn)步驟
本文主要介紹了Spring Boot整合JWT,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
elasticsearch中term與match的區(qū)別講解
今天小編就為大家分享一篇關(guān)于elasticsearch中term與match的區(qū)別講解,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-02-02

