SpringBoot整合rabbitMq自定義消息轉換方式
SpringBoot整合rabbitMq自定義消息轉換
存入rabbitMq時轉為string,取回來時轉為對應的類
package com.medi.hk.conf; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; @Component @Slf4j public class RabbitMqConfig { private final String mqConvertClassName = "className"; /** * mq消息轉換 * @return */ @Bean public MessageConverter messageConverter() { return new AbstractMessageConverter() { /** * 接收消息時轉換 * @param message * @return * @throws MessageConversionException */ @Override public Object fromMessage(Message message) throws MessageConversionException { byte[] body = message.getBody(); MessageProperties messageProperties = message.getMessageProperties(); //需要轉為對應類的類名 String className = (String) messageProperties.getHeaders().get(mqConvertClassName); try { Boolean aBoolean = (Boolean) messageProperties.getHeaders().get(isArray); if (aBoolean != null && aBoolean) { return JSONArray.parseArray(new String(body), Class.forName(className)); } else { return JSON.parseObject(body, Class.forName(className)); } } catch (ClassNotFoundException e) { log.error("mq轉換錯誤: message ==> {} body ==> {}", JSON.toJSONString(message), new String(body), e); throw new RuntimeException(e); } } /** * 發(fā)送消息時轉換 * @param object * @param messageProperties * @return */ @Override protected Message createMessage(Object object, MessageProperties messageProperties) { messageProperties.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); //標記時什么類轉換過來的 messageProperties.setHeader(mqConvertClassName, object.getClass().getName()); //如果是數(shù)組,則設置數(shù)組成員的類型(取第一個,只允許類型相同的數(shù)組) if (object instanceof Collection) { String className; Collection coll = (Collection) object; if (coll.size() > 0) { Object next = coll.iterator().next(); className = next.getClass().getName(); } else { className = Object.class.getName(); } messageProperties.setHeader(mqConvertClassName, className); messageProperties.setHeader(isArray, true); } else { messageProperties.setHeader(mqConvertClassName, object.getClass().getName()); } return new Message(JSON.toJSONBytes(object), messageProperties); } }; } }
springboot整合rabbitmq注意事項
隨著springboot愈發(fā)廣泛使用,各種中間件也需要和框架進行整合。
提示:以下是本篇文章正文內(nèi)容,下面案例可供參考
記錄本人在springboot下使用rabbitmq踩坑點。
在微服務架構下,生產(chǎn)者和消費者不在一個module下。消費者需要監(jiān)聽生產(chǎn)者發(fā)出的隊列消息。
在github上clone的項目中,運行消費者模塊時總是報錯:隊列404。
可是明明在生產(chǎn)者模塊已經(jīng)創(chuàng)建過。經(jīng)過查閱資料,原因是生產(chǎn)者和消費者不在同一模塊下導致。
解決方式有2種:
1.生產(chǎn)者消費者在同一模塊下,即可避免
即:交換機、隊列、綁定關系在同一模塊下就可。
如圖所示:
ExchangeConfig.java package com.space.rbq.order.config; import org.springframework.amqp.core.DirectExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息交換機配置 可以配置多個 * @author zhuzhe * @date 2018/5/25 15:40 * @email 1529949535@qq.com */ @Configuration public class ExchangeConfig { /** 消息交換機1的名字*/ public final String EXCHANGE_01 = "first_exchange"; /** * 1.定義direct exchange,綁定first_exchange * 2.durable="true" 持久化交換機, rabbitmq重啟的時候不需要創(chuàng)建新的交換機 * 3.direct交換器相對來說比較簡單,匹配規(guī)則為:如果路由鍵匹配,消息就被投送到相關的隊列 * fanout交換器中沒有路由鍵的概念,他會把消息發(fā)送到所有綁定在此交換器上面的隊列中。 * topic交換器你采用模糊匹配路由鍵的原則進行轉發(fā)消息到隊列中 */ @Bean public DirectExchange directExchange(){ DirectExchange directExchange = new DirectExchange(EXCHANGE_01,true,false); return directExchange; } } QueueConfig package com.space.rbq.order.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 隊列配置 可以配置多個隊列 * @author zhuzhe * @date 2018/5/25 13:25 * @email 1529949535@qq.com */ @Configuration.java public class QueueConfig { /*對列名稱*/ public static final String QUEUE_NAME1 = "first-queue"; public static final String QUEUE_NAME2 = "second-queue"; public static final String QUEUE_NAME3 = "third-queue"; @Bean public Queue firstQueue() { /** durable="true" 持久化消息隊列 , rabbitmq重啟的時候不需要創(chuàng)建新的隊列 auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認是false exclusive 表示該消息隊列是否只在當前connection生效,默認是false */ return new Queue(QUEUE_NAME1,true,false,false); } @Bean public Queue secondQueue() { return new Queue(QUEUE_NAME2,true,false,false); } @Bean public Queue thirdQueue() { // 配置 自動刪除 Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 60000);//60秒自動刪除 return new Queue(QUEUE_NAME3,true,false,true,arguments); } } RabbitMqConfig.java package com.space.rbq.order.config; import com.space.rbq.order.mqcallback.MsgSendConfirmCallBack; import com.space.rbq.order.mqcallback.MsgSendReturnCallback; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * RabbitMq配置 * @author zhuzhe * @date 2018/5/25 13:37 * @email 1529949535@qq.com */ @Configuration public class RabbitMqConfig { /** * key: queue在該direct-exchange中的key值,當消息發(fā)送給direct-exchange中指定key為設置值時, * 消息將會轉發(fā)給queue參數(shù)指定的消息隊列 */ /** 隊列key1*/ public static final String ROUTING_KEY_1 = "queue_one_key1"; public static final String ROUTING_KEY_2 = "queue_one_key2"; @Autowired private QueueConfig queueConfig; @Autowired private ExchangeConfig exchangeConfig; @Autowired private ConnectionFactory connectionFactory; // @Autowired // RabbitTemplate rabbitTemplate; // @PostConstruct // public void init() { // rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack()); // rabbitTemplate.setReturnCallback(msgSendReturnCallback()); // } /** * 將消息隊列1和交換機1進行綁定,指定隊列key1 */ @Bean public Binding binding_one() { return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_1); } /** * 將消息隊列2和交換機1進行綁定,指定隊列key2 */ @Bean public Binding binding_two() { return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_2); } /** * 定義rabbit template用于數(shù)據(jù)的接收和發(fā)送 * 可以設置消息確認機制和回調 * @return */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); // template.setMessageConverter(); 可以自定義消息轉換器 默認使用的JDK的,所以消息對象需要實現(xiàn)Serializable // template.setMessageConverter(new Jackson2JsonMessageConverter()); /**若使用confirm-callback或return-callback, * 必須要配置publisherConfirms或publisherReturns為true * 每個rabbitTemplate只能有一個confirm-callback和return-callback */ template.setConfirmCallback(msgSendConfirmCallBack()); /** * 使用return-callback時必須設置mandatory為true,或者在配置中設置mandatory-expression的值為true, * 可針對每次請求的消息去確定'mandatory'的boolean值, * 只能在提供'return -callback'時使用,與mandatory互斥 */ template.setReturnCallback(msgSendReturnCallback()); template.setMandatory(true); return template; } /** * 關于 msgSendConfirmCallBack 和 msgSendReturnCallback 的回調說明: * 1.如果消息沒有到exchange,則confirm回調,ack=false * 2.如果消息到達exchange,則confirm回調,ack=true * 3.exchange到queue成功,則不回調return * 4.exchange到queue失敗,則回調return(需設置mandatory=true,否則不回調,消息就丟了) */ /** * 消息確認機制 * Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些消息被broker處理, * 哪些可能因為broker宕掉或者網(wǎng)絡失敗的情況而重新發(fā)布。 * 確認并且保證消息被送達,提供了兩種方式:發(fā)布確認和事務。(兩者不可同時使用) * 在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。 * @return */ @Bean public MsgSendConfirmCallBack msgSendConfirmCallBack(){ return new MsgSendConfirmCallBack(); } @Bean public MsgSendReturnCallback msgSendReturnCallback(){ return new MsgSendReturnCallback(); } }
消費者監(jiān)聽器配置:
/** * queues 指定從哪個隊列(queue)訂閱消息 * @param message * @param channel */ @RabbitListener(queues = {QUEUE_NAME1}) public void handleMessage(Message message,Channel channel) throws IOException { try { // 處理消息 System.out.println("OrderConsumer {} handleMessage :"+message); // 執(zhí)行減庫存操作 // storeService.update(new Gson().fromJson(new String(message.getBody()),Order.class)); /** * 第一個參數(shù) deliveryTag:就是接受的消息的deliveryTag,可以通過msg.getMessageProperties().getDeliveryTag()獲得 * 第二個參數(shù) multiple:如果為true,確認之前接受到的消息;如果為false,只確認當前消息。 * 如果為true就表示連續(xù)取得多條消息才發(fā)會確認,和計算機網(wǎng)絡的中tcp協(xié)議接受分組的累積確認十分相似, * 能夠提高效率。 * * 同樣的,如果要nack或者拒絕消息(reject)的時候, * 也是調用channel里面的basicXXX方法就可以了(要指定tagId)。 * * 注意:如果拋異?;騨ack(并且requeue為true),消息會重新入隊列, * 并且會造成消費者不斷從隊列中讀取同一條消息的假象。 */ // 確認消息 // 如果 channel.basicAck channel.basicNack channel.basicReject 這三個方法都不執(zhí)行,消息也會被確認 【這個其實并沒有在官方看到,不過自己測試的確是這樣哈】 // 所以,正常情況下一般不需要執(zhí)行 channel.basicAck // channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }catch (Exception e){ log.error("OrderConsumer handleMessage {} , error:",message,e); // 處理消息失敗,將消息重新放回隊列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); } }
2.使用@RabbitListener 動態(tài)綁定
RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_NAME1, durable = "true", autoDelete = "false"), exchange = @Exchange(value = "first_exchange", type = ExchangeTypes.DIRECT), key = "queue_one_key1"))
注意:
在微服務架構下嗎,如果消息發(fā)送方和消費方不在一個模塊下,需要使用以上兩種方式。
總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
MyBatisPlus唯一索引批量新增或修改的實現(xiàn)方法
本文主要介紹了MyBatisPlus唯一索引批量新增或修改的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-03-03詳解rabbitmq使用springboot實現(xiàn)fanout模式
這篇文章主要介紹了rabbitmq使用springboot實現(xiàn)fanout模式,Fanout特點是發(fā)布與訂閱模式,是一種廣播機制,它是沒有路由key的模式,需要的朋友可以參考下2023-07-07