欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合rabbitMq自定義消息轉(zhuǎn)換方式

 更新時(shí)間:2023年09月28日 08:39:05   作者:YoungMirror  
這篇文章主要介紹了SpringBoot整合rabbitMq自定義消息轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

SpringBoot整合rabbitMq自定義消息轉(zhuǎn)換

存入rabbitMq時(shí)轉(zhuǎn)為string,取回來(lái)時(shí)轉(zhuǎn)為對(duì)應(yīng)的類(lèi)

package com.medi.hk.conf;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitMqConfig {
    private final String mqConvertClassName = "className";
    /**
     * mq消息轉(zhuǎn)換
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new AbstractMessageConverter() {
            /**
             * 接收消息時(shí)轉(zhuǎn)換
             * @param message
             * @return
             * @throws MessageConversionException
             */
            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                byte[] body = message.getBody();
                MessageProperties messageProperties = message.getMessageProperties();
                //需要轉(zhuǎn)為對(duì)應(yīng)類(lèi)的類(lèi)名
                String className = (String) messageProperties.getHeaders().get(mqConvertClassName);
                try {
                    Boolean aBoolean = (Boolean) messageProperties.getHeaders().get(isArray);
                    if (aBoolean != null && aBoolean) {
                        return JSONArray.parseArray(new String(body), Class.forName(className));
                    } else {
                        return JSON.parseObject(body, Class.forName(className));
                    }
                } catch (ClassNotFoundException e) {
                    log.error("mq轉(zhuǎn)換錯(cuò)誤: message ==> {} body ==> {}", JSON.toJSONString(message), new String(body), e);
                    throw new RuntimeException(e);
                }
            }
            /**
             * 發(fā)送消息時(shí)轉(zhuǎn)換
             * @param object
             * @param messageProperties
             * @return
             */
            @Override
            protected Message createMessage(Object object, MessageProperties messageProperties) {
                messageProperties.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
                //標(biāo)記時(shí)什么類(lèi)轉(zhuǎn)換過(guò)來(lái)的
                messageProperties.setHeader(mqConvertClassName, object.getClass().getName());
                //如果是數(shù)組,則設(shè)置數(shù)組成員的類(lèi)型(取第一個(gè),只允許類(lèi)型相同的數(shù)組)
                if (object instanceof Collection) {
                    String className;
                    Collection coll = (Collection) object;
                    if (coll.size() > 0) {
                        Object next = coll.iterator().next();
                        className = next.getClass().getName();
                    } else {
                        className = Object.class.getName();
                    }
                    messageProperties.setHeader(mqConvertClassName, className);
                    messageProperties.setHeader(isArray, true);
                } else {
                    messageProperties.setHeader(mqConvertClassName, object.getClass().getName());
                }
                return new Message(JSON.toJSONBytes(object), messageProperties);
            }
        };
    }
}

springboot整合rabbitmq注意事項(xiàng)

隨著springboot愈發(fā)廣泛使用,各種中間件也需要和框架進(jìn)行整合。

提示:以下是本篇文章正文內(nèi)容,下面案例可供參考

記錄本人在springboot下使用rabbitmq踩坑點(diǎn)。

在微服務(wù)架構(gòu)下,生產(chǎn)者和消費(fèi)者不在一個(gè)module下。消費(fèi)者需要監(jiān)聽(tīng)生產(chǎn)者發(fā)出的隊(duì)列消息。

在github上clone的項(xiàng)目中,運(yùn)行消費(fèi)者模塊時(shí)總是報(bào)錯(cuò):隊(duì)列404。

可是明明在生產(chǎn)者模塊已經(jīng)創(chuàng)建過(guò)。經(jīng)過(guò)查閱資料,原因是生產(chǎn)者和消費(fèi)者不在同一模塊下導(dǎo)致。

解決方式有2種:

1.生產(chǎn)者消費(fèi)者在同一模塊下,即可避免

即:交換機(jī)、隊(duì)列、綁定關(guān)系在同一模塊下就可。

如圖所示:

在這里插入圖片描述

ExchangeConfig.java
package com.space.rbq.order.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 消息交換機(jī)配置  可以配置多個(gè)
 * @author zhuzhe
 * @date 2018/5/25 15:40
 * @email 1529949535@qq.com
 */
@Configuration
public class ExchangeConfig {
    /** 消息交換機(jī)1的名字*/
    public final String EXCHANGE_01 = "first_exchange";
    /**
     *   1.定義direct exchange,綁定first_exchange
     *   2.durable="true" 持久化交換機(jī), rabbitmq重啟的時(shí)候不需要?jiǎng)?chuàng)建新的交換機(jī)
     *   3.direct交換器相對(duì)來(lái)說(shuō)比較簡(jiǎn)單,匹配規(guī)則為:如果路由鍵匹配,消息就被投送到相關(guān)的隊(duì)列
     *     fanout交換器中沒(méi)有路由鍵的概念,他會(huì)把消息發(fā)送到所有綁定在此交換器上面的隊(duì)列中。
     *     topic交換器你采用模糊匹配路由鍵的原則進(jìn)行轉(zhuǎn)發(fā)消息到隊(duì)列中
     */
    @Bean
    public DirectExchange directExchange(){
        DirectExchange directExchange = new DirectExchange(EXCHANGE_01,true,false);
        return directExchange;
    }
}
QueueConfig
package com.space.rbq.order.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * 隊(duì)列配置  可以配置多個(gè)隊(duì)列
 * @author zhuzhe
 * @date 2018/5/25 13:25
 * @email 1529949535@qq.com
 */
@Configuration.java
public class QueueConfig {
    /*對(duì)列名稱(chēng)*/
    public static final String QUEUE_NAME1 = "first-queue";
    public static final String QUEUE_NAME2 = "second-queue";
    public static final String QUEUE_NAME3 = "third-queue";
    @Bean
    public Queue firstQueue() {
        /**
         durable="true" 持久化消息隊(duì)列 , rabbitmq重啟的時(shí)候不需要?jiǎng)?chuàng)建新的隊(duì)列
         auto-delete 表示消息隊(duì)列沒(méi)有在使用時(shí)將被自動(dòng)刪除 默認(rèn)是false
         exclusive  表示該消息隊(duì)列是否只在當(dāng)前connection生效,默認(rèn)是false
         */
        return new Queue(QUEUE_NAME1,true,false,false);
    }
    @Bean
    public Queue secondQueue() {
        return new Queue(QUEUE_NAME2,true,false,false);
    }
    @Bean
    public Queue thirdQueue() {
        // 配置 自動(dòng)刪除
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 60000);//60秒自動(dòng)刪除
        return new Queue(QUEUE_NAME3,true,false,true,arguments);
    }
}
RabbitMqConfig.java
package com.space.rbq.order.config;
import com.space.rbq.order.mqcallback.MsgSendConfirmCallBack;
import com.space.rbq.order.mqcallback.MsgSendReturnCallback;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
 * RabbitMq配置
 * @author zhuzhe
 * @date 2018/5/25 13:37
 * @email 1529949535@qq.com
 */
@Configuration
public class RabbitMqConfig {
    /**
     * key: queue在該direct-exchange中的key值,當(dāng)消息發(fā)送給direct-exchange中指定key為設(shè)置值時(shí),
     * 消息將會(huì)轉(zhuǎn)發(fā)給queue參數(shù)指定的消息隊(duì)列
     */
    /** 隊(duì)列key1*/
    public static final String ROUTING_KEY_1 = "queue_one_key1";
    public static final String ROUTING_KEY_2 = "queue_one_key2";
    @Autowired
    private QueueConfig queueConfig;
    @Autowired
    private ExchangeConfig exchangeConfig;
    @Autowired
    private ConnectionFactory connectionFactory;
//    @Autowired
//    RabbitTemplate rabbitTemplate;
//    @PostConstruct
//    public void init() {
//        rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
//        rabbitTemplate.setReturnCallback(msgSendReturnCallback());
//    }
    /**
     * 將消息隊(duì)列1和交換機(jī)1進(jìn)行綁定,指定隊(duì)列key1
     */
    @Bean
    public Binding binding_one() {
        return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_1);
    }
    /**
     * 將消息隊(duì)列2和交換機(jī)1進(jìn)行綁定,指定隊(duì)列key2
     */
    @Bean
    public Binding binding_two() {
        return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_2);
    }
    /**
     * 定義rabbit template用于數(shù)據(jù)的接收和發(fā)送
     * 可以設(shè)置消息確認(rèn)機(jī)制和回調(diào)
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // template.setMessageConverter(); 可以自定義消息轉(zhuǎn)換器  默認(rèn)使用的JDK的,所以消息對(duì)象需要實(shí)現(xiàn)Serializable
        // template.setMessageConverter(new Jackson2JsonMessageConverter());
        /**若使用confirm-callback或return-callback,
         * 必須要配置publisherConfirms或publisherReturns為true
         * 每個(gè)rabbitTemplate只能有一個(gè)confirm-callback和return-callback
         */
        template.setConfirmCallback(msgSendConfirmCallBack());
        /**
         * 使用return-callback時(shí)必須設(shè)置mandatory為true,或者在配置中設(shè)置mandatory-expression的值為true,
         * 可針對(duì)每次請(qǐng)求的消息去確定'mandatory'的boolean值,
         * 只能在提供'return -callback'時(shí)使用,與mandatory互斥
         */
        template.setReturnCallback(msgSendReturnCallback());
        template.setMandatory(true);
        return template;
    }
    /**
     * 關(guān)于 msgSendConfirmCallBack 和 msgSendReturnCallback 的回調(diào)說(shuō)明:
     * 1.如果消息沒(méi)有到exchange,則confirm回調(diào),ack=false
     * 2.如果消息到達(dá)exchange,則confirm回調(diào),ack=true
     * 3.exchange到queue成功,則不回調(diào)return
     * 4.exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回調(diào),消息就丟了)
     */
    /**
     * 消息確認(rèn)機(jī)制
     * Confirms給客戶(hù)端一種輕量級(jí)的方式,能夠跟蹤哪些消息被broker處理,
     * 哪些可能因?yàn)閎roker宕掉或者網(wǎng)絡(luò)失敗的情況而重新發(fā)布。
     * 確認(rèn)并且保證消息被送達(dá),提供了兩種方式:發(fā)布確認(rèn)和事務(wù)。(兩者不可同時(shí)使用)
     * 在channel為事務(wù)時(shí),不可引入確認(rèn)模式;同樣channel為確認(rèn)模式下,不可使用事務(wù)。
     * @return
     */
    @Bean
    public MsgSendConfirmCallBack msgSendConfirmCallBack(){
        return new MsgSendConfirmCallBack();
    }
    @Bean
    public MsgSendReturnCallback msgSendReturnCallback(){
        return new MsgSendReturnCallback();
    }
}

消費(fèi)者監(jiān)聽(tīng)器配置:

在這里插入圖片描述

 /**
     * queues  指定從哪個(gè)隊(duì)列(queue)訂閱消息
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {QUEUE_NAME1})
    public void handleMessage(Message message,Channel channel) throws IOException {
        try {
            // 處理消息
            System.out.println("OrderConsumer {} handleMessage :"+message);
            // 執(zhí)行減庫(kù)存操作
//            storeService.update(new Gson().fromJson(new String(message.getBody()),Order.class));
           /**
            * 第一個(gè)參數(shù) deliveryTag:就是接受的消息的deliveryTag,可以通過(guò)msg.getMessageProperties().getDeliveryTag()獲得
            * 第二個(gè)參數(shù) multiple:如果為true,確認(rèn)之前接受到的消息;如果為false,只確認(rèn)當(dāng)前消息。
            * 如果為true就表示連續(xù)取得多條消息才發(fā)會(huì)確認(rèn),和計(jì)算機(jī)網(wǎng)絡(luò)的中tcp協(xié)議接受分組的累積確認(rèn)十分相似,
            * 能夠提高效率。
            *
            * 同樣的,如果要nack或者拒絕消息(reject)的時(shí)候,
            * 也是調(diào)用channel里面的basicXXX方法就可以了(要指定tagId)。
            *
            * 注意:如果拋異常或nack(并且requeue為true),消息會(huì)重新入隊(duì)列,
            * 并且會(huì)造成消費(fèi)者不斷從隊(duì)列中讀取同一條消息的假象。
            */
            // 確認(rèn)消息
            // 如果 channel.basicAck   channel.basicNack  channel.basicReject 這三個(gè)方法都不執(zhí)行,消息也會(huì)被確認(rèn) 【這個(gè)其實(shí)并沒(méi)有在官方看到,不過(guò)自己測(cè)試的確是這樣哈】
            // 所以,正常情況下一般不需要執(zhí)行 channel.basicAck
            // channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        }catch (Exception e){
            log.error("OrderConsumer  handleMessage {} , error:",message,e);
            // 處理消息失敗,將消息重新放回隊(duì)列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }
    }

2.使用@RabbitListener 動(dòng)態(tài)綁定

RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME1, durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "first_exchange", type = ExchangeTypes.DIRECT),
            key = "queue_one_key1"))

注意:

在微服務(wù)架構(gòu)下嗎,如果消息發(fā)送方和消費(fèi)方不在一個(gè)模塊下,需要使用以上兩種方式。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • HelloSpringMVC注解版實(shí)現(xiàn)步驟解析

    HelloSpringMVC注解版實(shí)現(xiàn)步驟解析

    這篇文章主要介紹了HelloSpringMVC注解版實(shí)現(xiàn)步驟解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-09-09
  • 深入淺出講解Java中的枚舉類(lèi)

    深入淺出講解Java中的枚舉類(lèi)

    這篇文章主要介紹了深入淺出講解Java中的枚舉類(lèi),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-09-09
  • Java與Python兩種編程語(yǔ)言的比較與應(yīng)用舉例詳解

    Java與Python兩種編程語(yǔ)言的比較與應(yīng)用舉例詳解

    這篇文章主要介紹了Java與Python兩種編程語(yǔ)言比較與應(yīng)用的相關(guān)資料,Java和Python各有特點(diǎn),Java適用于企業(yè)級(jí)應(yīng)用開(kāi)發(fā),Python則在數(shù)據(jù)科學(xué)和機(jī)器學(xué)習(xí)領(lǐng)域占優(yōu)勢(shì),兩者在語(yǔ)法、應(yīng)用領(lǐng)域、性能、開(kāi)發(fā)效率等方面存在差異,需要的朋友可以參考下
    2025-02-02
  • Java自定義注解對(duì)枚舉類(lèi)型參數(shù)的校驗(yàn)方法

    Java自定義注解對(duì)枚舉類(lèi)型參數(shù)的校驗(yàn)方法

    文章介紹了如何使用Java注解對(duì)枚舉類(lèi)型參數(shù)進(jìn)行校驗(yàn),通過(guò)自定義注解和注解校驗(yàn)類(lèi)實(shí)現(xiàn)參數(shù)的靈活性校驗(yàn),感興趣的朋友一起看看吧
    2025-01-01
  • MyBatisPlus唯一索引批量新增或修改的實(shí)現(xiàn)方法

    MyBatisPlus唯一索引批量新增或修改的實(shí)現(xiàn)方法

    本文主要介紹了MyBatisPlus唯一索引批量新增或修改的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • 深入理解Java中的裝箱和拆箱

    深入理解Java中的裝箱和拆箱

    這篇文章主要介紹了深入理解Java中的裝箱和拆箱,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-09-09
  • 使用java打印心型、圓形圖案的實(shí)現(xiàn)代碼

    使用java打印心型、圓形圖案的實(shí)現(xiàn)代碼

    這篇文章主要介紹了使用java打印心型、圓形圖案的實(shí)現(xiàn)代碼,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-12-12
  • SpringMVC異常處理知識(shí)點(diǎn)總結(jié)

    SpringMVC異常處理知識(shí)點(diǎn)總結(jié)

    在本篇文章里小編給大家整理的是關(guān)于SpringMVC異常處理相關(guān)知識(shí)點(diǎn)內(nèi)容,需要的朋友們學(xué)習(xí)下。
    2019-10-10
  • 詳解rabbitmq使用springboot實(shí)現(xiàn)fanout模式

    詳解rabbitmq使用springboot實(shí)現(xiàn)fanout模式

    這篇文章主要介紹了rabbitmq使用springboot實(shí)現(xiàn)fanout模式,Fanout特點(diǎn)是發(fā)布與訂閱模式,是一種廣播機(jī)制,它是沒(méi)有路由key的模式,需要的朋友可以參考下
    2023-07-07
  • java進(jìn)行error捕獲和處理示例(java異常捕獲)

    java進(jìn)行error捕獲和處理示例(java異常捕獲)

    通常來(lái)說(shuō),大家都是對(duì)Java中的Exception進(jìn)行捕獲和進(jìn)行相應(yīng)的處理,有些人說(shuō),error就無(wú)法捕獲了。其實(shí),error也是可以捕獲的。Error和Exception都是Throwable的子類(lèi)。既然可以catch Throwable,那么error也是可以catch的
    2014-01-01

最新評(píng)論