欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合rabbitMq自定義消息轉換方式

 更新時間:2023年09月28日 08:39:05   作者:YoungMirror  
這篇文章主要介紹了SpringBoot整合rabbitMq自定義消息轉換方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

SpringBoot整合rabbitMq自定義消息轉換

存入rabbitMq時轉為string,取回來時轉為對應的類

package com.medi.hk.conf;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitMqConfig {
    private final String mqConvertClassName = "className";
    /**
     * mq消息轉換
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new AbstractMessageConverter() {
            /**
             * 接收消息時轉換
             * @param message
             * @return
             * @throws MessageConversionException
             */
            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                byte[] body = message.getBody();
                MessageProperties messageProperties = message.getMessageProperties();
                //需要轉為對應類的類名
                String className = (String) messageProperties.getHeaders().get(mqConvertClassName);
                try {
                    Boolean aBoolean = (Boolean) messageProperties.getHeaders().get(isArray);
                    if (aBoolean != null && aBoolean) {
                        return JSONArray.parseArray(new String(body), Class.forName(className));
                    } else {
                        return JSON.parseObject(body, Class.forName(className));
                    }
                } catch (ClassNotFoundException e) {
                    log.error("mq轉換錯誤: message ==> {} body ==> {}", JSON.toJSONString(message), new String(body), e);
                    throw new RuntimeException(e);
                }
            }
            /**
             * 發(fā)送消息時轉換
             * @param object
             * @param messageProperties
             * @return
             */
            @Override
            protected Message createMessage(Object object, MessageProperties messageProperties) {
                messageProperties.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
                //標記時什么類轉換過來的
                messageProperties.setHeader(mqConvertClassName, object.getClass().getName());
                //如果是數(shù)組,則設置數(shù)組成員的類型(取第一個,只允許類型相同的數(shù)組)
                if (object instanceof Collection) {
                    String className;
                    Collection coll = (Collection) object;
                    if (coll.size() > 0) {
                        Object next = coll.iterator().next();
                        className = next.getClass().getName();
                    } else {
                        className = Object.class.getName();
                    }
                    messageProperties.setHeader(mqConvertClassName, className);
                    messageProperties.setHeader(isArray, true);
                } else {
                    messageProperties.setHeader(mqConvertClassName, object.getClass().getName());
                }
                return new Message(JSON.toJSONBytes(object), messageProperties);
            }
        };
    }
}

springboot整合rabbitmq注意事項

隨著springboot愈發(fā)廣泛使用,各種中間件也需要和框架進行整合。

提示:以下是本篇文章正文內(nèi)容,下面案例可供參考

記錄本人在springboot下使用rabbitmq踩坑點。

在微服務架構下,生產(chǎn)者和消費者不在一個module下。消費者需要監(jiān)聽生產(chǎn)者發(fā)出的隊列消息。

在github上clone的項目中,運行消費者模塊時總是報錯:隊列404。

可是明明在生產(chǎn)者模塊已經(jīng)創(chuàng)建過。經(jīng)過查閱資料,原因是生產(chǎn)者和消費者不在同一模塊下導致。

解決方式有2種:

1.生產(chǎn)者消費者在同一模塊下,即可避免

即:交換機、隊列、綁定關系在同一模塊下就可。

如圖所示:

在這里插入圖片描述

ExchangeConfig.java
package com.space.rbq.order.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 消息交換機配置  可以配置多個
 * @author zhuzhe
 * @date 2018/5/25 15:40
 * @email 1529949535@qq.com
 */
@Configuration
public class ExchangeConfig {
    /** 消息交換機1的名字*/
    public final String EXCHANGE_01 = "first_exchange";
    /**
     *   1.定義direct exchange,綁定first_exchange
     *   2.durable="true" 持久化交換機, rabbitmq重啟的時候不需要創(chuàng)建新的交換機
     *   3.direct交換器相對來說比較簡單,匹配規(guī)則為:如果路由鍵匹配,消息就被投送到相關的隊列
     *     fanout交換器中沒有路由鍵的概念,他會把消息發(fā)送到所有綁定在此交換器上面的隊列中。
     *     topic交換器你采用模糊匹配路由鍵的原則進行轉發(fā)消息到隊列中
     */
    @Bean
    public DirectExchange directExchange(){
        DirectExchange directExchange = new DirectExchange(EXCHANGE_01,true,false);
        return directExchange;
    }
}
QueueConfig
package com.space.rbq.order.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * 隊列配置  可以配置多個隊列
 * @author zhuzhe
 * @date 2018/5/25 13:25
 * @email 1529949535@qq.com
 */
@Configuration.java
public class QueueConfig {
    /*對列名稱*/
    public static final String QUEUE_NAME1 = "first-queue";
    public static final String QUEUE_NAME2 = "second-queue";
    public static final String QUEUE_NAME3 = "third-queue";
    @Bean
    public Queue firstQueue() {
        /**
         durable="true" 持久化消息隊列 , rabbitmq重啟的時候不需要創(chuàng)建新的隊列
         auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認是false
         exclusive  表示該消息隊列是否只在當前connection生效,默認是false
         */
        return new Queue(QUEUE_NAME1,true,false,false);
    }
    @Bean
    public Queue secondQueue() {
        return new Queue(QUEUE_NAME2,true,false,false);
    }
    @Bean
    public Queue thirdQueue() {
        // 配置 自動刪除
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 60000);//60秒自動刪除
        return new Queue(QUEUE_NAME3,true,false,true,arguments);
    }
}
RabbitMqConfig.java
package com.space.rbq.order.config;
import com.space.rbq.order.mqcallback.MsgSendConfirmCallBack;
import com.space.rbq.order.mqcallback.MsgSendReturnCallback;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
 * RabbitMq配置
 * @author zhuzhe
 * @date 2018/5/25 13:37
 * @email 1529949535@qq.com
 */
@Configuration
public class RabbitMqConfig {
    /**
     * key: queue在該direct-exchange中的key值,當消息發(fā)送給direct-exchange中指定key為設置值時,
     * 消息將會轉發(fā)給queue參數(shù)指定的消息隊列
     */
    /** 隊列key1*/
    public static final String ROUTING_KEY_1 = "queue_one_key1";
    public static final String ROUTING_KEY_2 = "queue_one_key2";
    @Autowired
    private QueueConfig queueConfig;
    @Autowired
    private ExchangeConfig exchangeConfig;
    @Autowired
    private ConnectionFactory connectionFactory;
//    @Autowired
//    RabbitTemplate rabbitTemplate;
//    @PostConstruct
//    public void init() {
//        rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
//        rabbitTemplate.setReturnCallback(msgSendReturnCallback());
//    }
    /**
     * 將消息隊列1和交換機1進行綁定,指定隊列key1
     */
    @Bean
    public Binding binding_one() {
        return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_1);
    }
    /**
     * 將消息隊列2和交換機1進行綁定,指定隊列key2
     */
    @Bean
    public Binding binding_two() {
        return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_2);
    }
    /**
     * 定義rabbit template用于數(shù)據(jù)的接收和發(fā)送
     * 可以設置消息確認機制和回調
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // template.setMessageConverter(); 可以自定義消息轉換器  默認使用的JDK的,所以消息對象需要實現(xiàn)Serializable
        // template.setMessageConverter(new Jackson2JsonMessageConverter());
        /**若使用confirm-callback或return-callback,
         * 必須要配置publisherConfirms或publisherReturns為true
         * 每個rabbitTemplate只能有一個confirm-callback和return-callback
         */
        template.setConfirmCallback(msgSendConfirmCallBack());
        /**
         * 使用return-callback時必須設置mandatory為true,或者在配置中設置mandatory-expression的值為true,
         * 可針對每次請求的消息去確定'mandatory'的boolean值,
         * 只能在提供'return -callback'時使用,與mandatory互斥
         */
        template.setReturnCallback(msgSendReturnCallback());
        template.setMandatory(true);
        return template;
    }
    /**
     * 關于 msgSendConfirmCallBack 和 msgSendReturnCallback 的回調說明:
     * 1.如果消息沒有到exchange,則confirm回調,ack=false
     * 2.如果消息到達exchange,則confirm回調,ack=true
     * 3.exchange到queue成功,則不回調return
     * 4.exchange到queue失敗,則回調return(需設置mandatory=true,否則不回調,消息就丟了)
     */
    /**
     * 消息確認機制
     * Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些消息被broker處理,
     * 哪些可能因為broker宕掉或者網(wǎng)絡失敗的情況而重新發(fā)布。
     * 確認并且保證消息被送達,提供了兩種方式:發(fā)布確認和事務。(兩者不可同時使用)
     * 在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。
     * @return
     */
    @Bean
    public MsgSendConfirmCallBack msgSendConfirmCallBack(){
        return new MsgSendConfirmCallBack();
    }
    @Bean
    public MsgSendReturnCallback msgSendReturnCallback(){
        return new MsgSendReturnCallback();
    }
}

消費者監(jiān)聽器配置:

在這里插入圖片描述

 /**
     * queues  指定從哪個隊列(queue)訂閱消息
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {QUEUE_NAME1})
    public void handleMessage(Message message,Channel channel) throws IOException {
        try {
            // 處理消息
            System.out.println("OrderConsumer {} handleMessage :"+message);
            // 執(zhí)行減庫存操作
//            storeService.update(new Gson().fromJson(new String(message.getBody()),Order.class));
           /**
            * 第一個參數(shù) deliveryTag:就是接受的消息的deliveryTag,可以通過msg.getMessageProperties().getDeliveryTag()獲得
            * 第二個參數(shù) multiple:如果為true,確認之前接受到的消息;如果為false,只確認當前消息。
            * 如果為true就表示連續(xù)取得多條消息才發(fā)會確認,和計算機網(wǎng)絡的中tcp協(xié)議接受分組的累積確認十分相似,
            * 能夠提高效率。
            *
            * 同樣的,如果要nack或者拒絕消息(reject)的時候,
            * 也是調用channel里面的basicXXX方法就可以了(要指定tagId)。
            *
            * 注意:如果拋異?;騨ack(并且requeue為true),消息會重新入隊列,
            * 并且會造成消費者不斷從隊列中讀取同一條消息的假象。
            */
            // 確認消息
            // 如果 channel.basicAck   channel.basicNack  channel.basicReject 這三個方法都不執(zhí)行,消息也會被確認 【這個其實并沒有在官方看到,不過自己測試的確是這樣哈】
            // 所以,正常情況下一般不需要執(zhí)行 channel.basicAck
            // channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        }catch (Exception e){
            log.error("OrderConsumer  handleMessage {} , error:",message,e);
            // 處理消息失敗,將消息重新放回隊列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }
    }

2.使用@RabbitListener 動態(tài)綁定

RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME1, durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "first_exchange", type = ExchangeTypes.DIRECT),
            key = "queue_one_key1"))

注意:

在微服務架構下嗎,如果消息發(fā)送方和消費方不在一個模塊下,需要使用以上兩種方式。

總結

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • HelloSpringMVC注解版實現(xiàn)步驟解析

    HelloSpringMVC注解版實現(xiàn)步驟解析

    這篇文章主要介紹了HelloSpringMVC注解版實現(xiàn)步驟解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • 深入淺出講解Java中的枚舉類

    深入淺出講解Java中的枚舉類

    這篇文章主要介紹了深入淺出講解Java中的枚舉類,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-09-09
  • Java與Python兩種編程語言的比較與應用舉例詳解

    Java與Python兩種編程語言的比較與應用舉例詳解

    這篇文章主要介紹了Java與Python兩種編程語言比較與應用的相關資料,Java和Python各有特點,Java適用于企業(yè)級應用開發(fā),Python則在數(shù)據(jù)科學和機器學習領域占優(yōu)勢,兩者在語法、應用領域、性能、開發(fā)效率等方面存在差異,需要的朋友可以參考下
    2025-02-02
  • Java自定義注解對枚舉類型參數(shù)的校驗方法

    Java自定義注解對枚舉類型參數(shù)的校驗方法

    文章介紹了如何使用Java注解對枚舉類型參數(shù)進行校驗,通過自定義注解和注解校驗類實現(xiàn)參數(shù)的靈活性校驗,感興趣的朋友一起看看吧
    2025-01-01
  • MyBatisPlus唯一索引批量新增或修改的實現(xiàn)方法

    MyBatisPlus唯一索引批量新增或修改的實現(xiàn)方法

    本文主要介紹了MyBatisPlus唯一索引批量新增或修改的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-03-03
  • 深入理解Java中的裝箱和拆箱

    深入理解Java中的裝箱和拆箱

    這篇文章主要介紹了深入理解Java中的裝箱和拆箱,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-09-09
  • 使用java打印心型、圓形圖案的實現(xiàn)代碼

    使用java打印心型、圓形圖案的實現(xiàn)代碼

    這篇文章主要介紹了使用java打印心型、圓形圖案的實現(xiàn)代碼,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • SpringMVC異常處理知識點總結

    SpringMVC異常處理知識點總結

    在本篇文章里小編給大家整理的是關于SpringMVC異常處理相關知識點內(nèi)容,需要的朋友們學習下。
    2019-10-10
  • 詳解rabbitmq使用springboot實現(xiàn)fanout模式

    詳解rabbitmq使用springboot實現(xiàn)fanout模式

    這篇文章主要介紹了rabbitmq使用springboot實現(xiàn)fanout模式,Fanout特點是發(fā)布與訂閱模式,是一種廣播機制,它是沒有路由key的模式,需要的朋友可以參考下
    2023-07-07
  • java進行error捕獲和處理示例(java異常捕獲)

    java進行error捕獲和處理示例(java異常捕獲)

    通常來說,大家都是對Java中的Exception進行捕獲和進行相應的處理,有些人說,error就無法捕獲了。其實,error也是可以捕獲的。Error和Exception都是Throwable的子類。既然可以catch Throwable,那么error也是可以catch的
    2014-01-01

最新評論