欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法

 更新時(shí)間:2021年11月01日 11:27:13   作者:buguge  
當(dāng)項(xiàng)目部署到測試環(huán)境后,QA測試過程中,總是“莫名其妙”的發(fā)現(xiàn)所保存的用戶付款單數(shù)據(jù)有問題。這篇文章主要介紹了springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi),需要的朋友可以參考下

如何保證mq隊(duì)列里的消息只被測試服務(wù)器上的consumer消費(fèi),避免本地環(huán)境誤消費(fèi)?

程序里有一個(gè)應(yīng)用場景使用到了rabbitmq——當(dāng)財(cái)務(wù)確認(rèn)收到企業(yè)的打款金額后,系統(tǒng)會(huì)把企業(yè)訂單生成用戶付款單。由于訂單記錄數(shù)據(jù)量大,改為通過mq來異步實(shí)現(xiàn)。即財(cái)務(wù)確認(rèn)收款操作后,將企業(yè)訂單數(shù)據(jù)放入mq,另一端監(jiān)聽mq消息隊(duì)列,將收到的企業(yè)訂單加工轉(zhuǎn)換成用戶付款單,并做持久化。

本地開發(fā)環(huán)境與測試環(huán)境共用一套rabbitmq。當(dāng)項(xiàng)目部署到測試環(huán)境后,QA測試過程中,總是“莫名其妙”的發(fā)現(xiàn)所保存的用戶付款單數(shù)據(jù)有問題。

當(dāng)然,首先要排查程序,檢查Consumer的數(shù)據(jù)處理的邏輯是否有bug。單元測試后,發(fā)現(xiàn)并不存在測試環(huán)境的bug。

原來,消息隊(duì)列被“非正?!毕M(fèi)了!

Q: 什么情況?

A: 幾個(gè)伙伴一起參與的項(xiàng)目,大家總是要調(diào)試自己的程序的。而如果碰巧本地程序監(jiān)聽到消息隊(duì)列里有消息,那么,消息就被本地程序消費(fèi)掉了。問題正是出現(xiàn)在這里!————團(tuán)隊(duì)開發(fā),大家并不會(huì)及時(shí)檢出git上最新的程序版本。如果本地的程序版本不是最新的正確的版本,勢必會(huì)出現(xiàn)bug。

那么,怎么辦?

每次你改了邏輯,告訴大家獲取最新?

不現(xiàn)實(shí),約定的東西往往不奏效的。

如何保證mq隊(duì)列里的消息只被測試服務(wù)器上的consumer消費(fèi),避免本地環(huán)境誤消費(fèi)? 或者說,如何實(shí)現(xiàn)消息的定向消費(fèi)呢?

只要肯琢磨,辦法總比困難多!百思可得解!

我們知道,rabbitmq手動(dòng)ack模式。這還不夠,因?yàn)槲覀冊趺醋宑onsumer來決定是否消費(fèi)呢? 所以,我們需要一個(gè)標(biāo)識(shí)————producer設(shè)定一個(gè)標(biāo)識(shí),consumer如果匹配這個(gè)標(biāo)識(shí),則消費(fèi),否則予以reject放回消息隊(duì)列。

通過查看spring-rabbit/spring-amqp的代碼,發(fā)現(xiàn)可以在spring-amqp里的MessageProperties上做文章。生產(chǎn)者與消費(fèi)者每次消息傳輸都會(huì)攜帶一個(gè)MessageProperties,通常我們是不指定的,走M(jìn)essageProperties的默認(rèn)設(shè)置值。

我的策略:MessageProperties有一個(gè)屬性叫AppId。我們程序所部署的測試機(jī)器就一臺(tái),即消息Producer和消息Consumer在一臺(tái)機(jī)器上。那么,我就可以利用機(jī)器的IP來識(shí)別消息。只有Producer與Consumer的IP匹配,才消費(fèi)消息。程序員本機(jī)IP與測試服務(wù)器IP不一樣,就會(huì)拒絕接收消息,會(huì)把消息重新放回消息隊(duì)列,等待測試服務(wù)器的Consumer消費(fèi)。

話不多說,上代碼吧,

生產(chǎn)者代碼:

package com.sboot.mq;

import org.junit.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.InetAddress;
import java.util.UUID;

public class MQProducerTest extends BaseTest {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void test() throws Exception {
        for (int i = 1; i <= 5; i++) {
            MessageProperties messageProperties = new MessageProperties();
            String ip = InetAddress.getLocalHost().getHostAddress();
            messageProperties.setAppId(ip);
//            messageProperties.setUserId(String.valueOf(i));
            MessageConverter messageConverter = new SimpleMessageConverter();
            String msg = UUID.randomUUID().toString();
//            System.out.println(msg);
            Message message1 = messageConverter.toMessage(msg, messageProperties);
            rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1);
            System.out.println("入隊(duì)完成");
            Thread.sleep(500L);
        }
    }
}

消費(fèi)者手動(dòng)ACK,要實(shí)現(xiàn)ChannelAwareMessageListener接口,感知rabbitmq.client.Channel實(shí)例,調(diào)用channel的basicAck、basicReject等方法:

package com.sboot.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import java.net.InetAddress;

@Component
@Profile(value = "dev")
@Slf4j
public class UserSettlementDevConsumer implements ChannelAwareMessageListener {

    @RabbitHandler
    @RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis());

        long tag = message.getMessageProperties().getDeliveryTag();
        String appId = message.getMessageProperties().getAppId();
        log.info("{}-{}, 消息出隊(duì)", tag, appId);
        String receiveMsg = "";
        try {
            //核對標(biāo)識(shí),決定是否消費(fèi)消息
            String ip = InetAddress.getLocalHost().getHostAddress();
            if (!ip.equals(appId)) {
                log.info("這不是我需要的消息。放回隊(duì)列。{}", receiveMsg);
//                channel.basicNack(tag, false, true);
                channel.basicReject(tag, true);
//                channel.basicRecover(true);
                return;
            }

            MessageConverter messageConverter = new SimpleMessageConverter();
            receiveMsg = String.valueOf(messageConverter.fromMessage(message));
            。。。。在這里消費(fèi)消息
            log.info("success " + receiveMsg);
            channel.basicAck(tag, false);

        } catch (Exception e) {
            log.error("receive message has an error, ", e);
            channel.basicNack(tag, false, true);
        }
    }

}

說明一下依賴的spring-rabbit包的版本,我的是2.2.0.RELEASE。如果是2.1.4版本里,@RabbitListener注解沒有ackMode。

解決本案問題過程中的花絮:

spring-rabbit-2.1.4.RELEASEspring-rabbit-2.2.0.RELEASE

@RabbitListener的ackMode的值見枚舉org.springframework.amqp.core.AcknowledgeMode

NONE-- no acks(自動(dòng)消費(fèi) autoAck)MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手動(dòng)消費(fèi),Consumer端必須顯式調(diào)用ack或nack)AUTO --

設(shè)置了手動(dòng)消費(fèi),上文消費(fèi)端的deliveryTag會(huì)是不同的long值。自動(dòng)消費(fèi)的deliveryTag是重復(fù)的1和2這樣的。并且,自動(dòng)消費(fèi)時(shí),如果要使用channel的ack/nack,會(huì)報(bào)異常:

2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)

到此這篇關(guān)于springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的文章就介紹到這了,更多相關(guān)springboot rabbitmq消費(fèi)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Mybatis與Jpa的區(qū)別和性能對比總結(jié)

    Mybatis與Jpa的區(qū)別和性能對比總結(jié)

    mybatis和jpa兩個(gè)持久層框架,從底層到用法都不同,但是實(shí)現(xiàn)的功能是一樣的,所以說一直以來頗有爭議,所以下面這篇文章主要給大家介紹了關(guān)于Mybatis與Jpa的區(qū)別和性能對比的相關(guān)資料,需要的朋友可以參考下
    2021-06-06
  • Java令牌Token登錄與退出的實(shí)現(xiàn)

    Java令牌Token登錄與退出的實(shí)現(xiàn)

    這篇文章主要介紹了Java令牌Token登錄與退出的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • mybatis update更新字段的使用操作

    mybatis update更新字段的使用操作

    這篇文章主要介紹了mybatis update更新字段的使用操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • java基于嵌入式Tomcat的War包啟動(dòng)器

    java基于嵌入式Tomcat的War包啟動(dòng)器

    本文主要介紹了java基于嵌入式Tomcat的War包啟動(dòng)器,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • Java的堵塞隊(duì)列BlockingQueue詳解

    Java的堵塞隊(duì)列BlockingQueue詳解

    這篇文章主要介紹了Java的堵塞隊(duì)列BlockingQueue詳解,阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是向隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里取元素的線程,需要的朋友可以參考下
    2023-12-12
  • 啟動(dòng)springboot應(yīng)用因未配置數(shù)據(jù)庫報(bào)錯(cuò)的解決方案

    啟動(dòng)springboot應(yīng)用因未配置數(shù)據(jù)庫報(bào)錯(cuò)的解決方案

    這篇文章主要介紹了啟動(dòng)springboot應(yīng)用因未配置數(shù)據(jù)庫報(bào)錯(cuò)的解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • mybatis中如何實(shí)現(xiàn)一個(gè)標(biāo)簽執(zhí)行多個(gè)sql語句

    mybatis中如何實(shí)現(xiàn)一個(gè)標(biāo)簽執(zhí)行多個(gè)sql語句

    這篇文章主要介紹了mybatis中如何實(shí)現(xiàn)一個(gè)標(biāo)簽執(zhí)行多個(gè)sql語句問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-04-04
  • Mybatis動(dòng)態(tài)查詢字段及表名的實(shí)現(xiàn)

    Mybatis動(dòng)態(tài)查詢字段及表名的實(shí)現(xiàn)

    本文主要介紹了Mybatis動(dòng)態(tài)查詢字段及表名的實(shí)現(xiàn),通過靈活運(yùn)用Mybatis提供的動(dòng)態(tài)SQL功能,我們可以構(gòu)建更加靈活、高效的查詢語句,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2024-01-01
  • MyBatis sql中test如何判斷Boolean

    MyBatis sql中test如何判斷Boolean

    這篇文章主要介紹了MyBatis sql中test如何判斷Boolean,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作

    Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作

    這篇文章主要介紹了Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧
    2023-02-02

最新評論