欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot rabbitmq整合rabbitmq之消息持久化存儲問題

 更新時間:2023年09月28日 08:50:08   作者:weixin_43831204  
這篇文章主要介紹了springboot rabbitmq整合rabbitmq之消息持久化存儲問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

rabbitmq消息持久化存儲包含三個方面

  • 1、exchange的持久化
  • 2、queue的持久化
  • 3、message的持久化

exchange的持久化

在申明exchange的時候,有個參數(shù):durable。

當該參數(shù)為true,則對該exchange做持久化,重啟rabbitmq服務(wù)器,該exchange不會消失。

durable的默認值為true

public class DirectExchange extends AbstractExchange {
    public static final DirectExchange DEFAULT = new DirectExchange("");
    public DirectExchange(String name) {
        super(name);
    }
    public DirectExchange(String name, boolean durable, boolean autoDelete) {
        super(name, durable, autoDelete);
    }
    public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        super(name, durable, autoDelete, arguments);
    }
    public final String getType() {
        return "direct";
    }
}
public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    private final String name;
    private final boolean durable;
    private final boolean autoDelete;
    private final Map<String, Object> arguments;
    private volatile boolean delayed;
    private boolean internal;
    public AbstractExchange(String name) {
        this(name, true, false);
    }
    public AbstractExchange(String name, boolean durable, boolean autoDelete) {
        this(name, durable, autoDelete, (Map)null);
    }
    public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        this.name = name;
        this.durable = durable;
        this.autoDelete = autoDelete;
        if (arguments != null) {
            this.arguments = arguments;
        } else {
            this.arguments = new HashMap();
        }
    }

queue的持久化

申明隊列時也有個參數(shù):durable。

當該參數(shù)為true,則對該queue做持久化,重啟rabbitmq服務(wù)器,該queue不會消失。

durable的默認值為true

public Queue(String name) {
        this(name, true, false, false);
    }
    public Queue(String name, boolean durable) {
        this(name, durable, false, false, (Map)null);
    }
    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, (Map)null);
    }
    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
        Assert.notNull(name, "'name' cannot be null");
        this.name = name;
        this.actualName = StringUtils.hasText(name) ? name : Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration";
        this.durable = durable;
        this.exclusive = exclusive;
        this.autoDelete = autoDelete;
        this.arguments = (Map)(arguments != null ? arguments : new HashMap());
    }

message的持久化

前面我們已經(jīng)講到exchange與queue的持久化,那么message如何持久化呢?

我們在使用rabbit-client做消息持久化時,設(shè)置了BasicProperties的deliveryMode為2,做消息的持久化。

AMQP.BasicProperties properties = new AMQP.BasicProperties.
                Builder().
                deliveryMode(2).
                build();
        channel.basicPublish("ex.pc", "key.pc",  properties, "hello world".getBytes());

那么整合了spring boot,使用RabbitTemplate如何做持久化?

首先,我們來到經(jīng)常的使用的消息發(fā)送方法:RabbitTemplate類下的convertAndSend

@Override
    public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
        convertAndSend(exchange, routingKey, object, (CorrelationData) null);
    }

然后調(diào)用了該類下的重載方法:convertAndSend。該方法中將object 轉(zhuǎn)換成了message

@Override
    public void convertAndSend(String exchange, String routingKey, final Object object,
            @Nullable CorrelationData correlationData) throws AmqpException {
        send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }

在做消息轉(zhuǎn)換的時候,我們注意到,傳入了一個MessageProperties對象

protected Message convertMessageIfNecessary(final Object object) {
        if (object instanceof Message) {
            return (Message) object;
        }
        return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

在MessageProperties中,有個deliveryMode屬性,該屬性默認值為:MessageDeliveryMode.PERSISTENT(持久化的)

 public MessageProperties() {
        this.deliveryMode = DEFAULT_DELIVERY_MODE;
        this.priority = DEFAULT_PRIORITY;
    }
static {
    DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
    DEFAULT_PRIORITY = 0;
}

消息轉(zhuǎn)換完成后,調(diào)用時同類方法的send方法

@Override
    public void send(final String exchange, final String routingKey,
            final Message message, @Nullable final CorrelationData correlationData)
            throws AmqpException {
        execute(channel -> {
            doSend(channel, exchange, routingKey, message,
                    (RabbitTemplate.this.returnCallback != null
                            || (correlationData != null && StringUtils.hasText(correlationData.getId())))
                            && RabbitTemplate.this.mandatoryExpression.getValue(
                                    RabbitTemplate.this.evaluationContext, message, Boolean.class),
                    correlationData);
            return null;
        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

該方法又調(diào)用了doSend方法

public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
            boolean mandatory, @Nullable CorrelationData correlationData)
                    throws Exception { // NOSONAR TODO: change to IOException in 2.2.
        String exch = exchangeArg;
        String rKey = routingKeyArg;
        if (exch == null) {
            exch = this.exchange;
        }
        if (rKey == null) {
            rKey = this.routingKey;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Publishing message " + message
                    + "on exchange [" + exch + "], routingKey = [" + rKey + "]");
        }
        Message messageToUse = message;
        MessageProperties messageProperties = messageToUse.getMessageProperties();
        if (mandatory) {
            messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
        }
        if (this.beforePublishPostProcessors != null) {
            for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
                messageToUse = processor.postProcessMessage(messageToUse, correlationData);
            }
        }
        setupConfirm(channel, messageToUse, correlationData);
        if (this.userIdExpression != null && messageProperties.getUserId() == null) {
            String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
            if (userId != null) {
                messageProperties.setUserId(userId);
            }
        }
        sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
        // Check if commit needed
        if (isChannelLocallyTransacted(channel)) {
            // Transacted channel created by this template -> commit.
            RabbitUtils.commitIfNecessary(channel);
        }
    }

在該方法中我們終于看到了發(fā)送消息到rabbitmq的操作:sendToRabbit。

該方法將MessageProperties對象轉(zhuǎn)換成了BasicProperties。

至此,我們終于了解了,spring rabbit 中如何實現(xiàn)messge的持久化。

默認的message就是持久化的

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
            Message message) throws IOException {
        BasicProperties convertedMessageProperties = this.messagePropertiesConverter
                .fromMessageProperties(message.getMessageProperties(), this.encoding);
        channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
    }

如何改變message的持久化屬性?

根據(jù)上面的源碼分析,spring中默認的message就是持久化的,如何改變持久化屬性?

1、使用send方法,發(fā)送message。設(shè)置message中MessageProperties的屬性deliveryMode

2、自定義MessageConverter,在消息轉(zhuǎn)換時,設(shè)置MessageProperties的屬性deliveryMode

3、自定MessagePropertiesConverter,在MessageProperties對象轉(zhuǎn)換成BasicProperties時,設(shè)置deliveryMode

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java中l(wèi)ength,length(),size()詳解及區(qū)別

    Java中l(wèi)ength,length(),size()詳解及區(qū)別

    這篇文章主要介紹了Java中l(wèi)ength,length(),size()詳解及區(qū)別的相關(guān)資料,需要的朋友可以參考下
    2016-11-11
  • Spring如何處理注解的深入理解

    Spring如何處理注解的深入理解

    這篇文章主要給大家介紹了關(guān)于Spring如何處理注解的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用java中的注解具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-11-11
  • spring boot靜態(tài)變量注入配置文件詳解

    spring boot靜態(tài)變量注入配置文件詳解

    這篇文章主要為大家詳細介紹了spring boot靜態(tài)變量注入配置文件的相關(guān)資料,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-04-04
  • Spring Boot整合JWT的實現(xiàn)步驟

    Spring Boot整合JWT的實現(xiàn)步驟

    本文主要介紹了Spring Boot整合JWT,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • Spring學(xué)習(xí)之Bean的裝配多種方法

    Spring學(xué)習(xí)之Bean的裝配多種方法

    本篇文章主要介紹了Spring學(xué)習(xí)之Bean的裝配三種方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-07-07
  • java 從服務(wù)器下載文件并保存到本地的示例

    java 從服務(wù)器下載文件并保存到本地的示例

    本篇文章主要介紹了java 從服務(wù)器下載文件并保存到本地的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-02-02
  • 淺析java中的取整(/)和求余(%)

    淺析java中的取整(/)和求余(%)

    這篇文章主要介紹了淺析java中的取整(/)和求余(%),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • Java響應(yīng)式編程之handle用法解析

    Java響應(yīng)式編程之handle用法解析

    這篇文章主要介紹了Java響應(yīng)式編程之handle用法解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • elasticsearch中term與match的區(qū)別講解

    elasticsearch中term與match的區(qū)別講解

    今天小編就為大家分享一篇關(guān)于elasticsearch中term與match的區(qū)別講解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-02-02
  • Java實現(xiàn)List去重的方法詳解

    Java實現(xiàn)List去重的方法詳解

    本文用示例介紹Java的List(ArrayList、LinkedList等)的去重的方法。List去重的常用方法一般是:JDK8的stream的distinct、轉(zhuǎn)為HashSet、轉(zhuǎn)為TreeSet等,感興趣的可以了解一下
    2022-05-05

最新評論