Rabbit消息重試機制問題記錄
消息重試機制
概述
消息重試機制就是在消息處理失敗之后重新發(fā)送,主要時為了解決消息發(fā)送過程可能會出現(xiàn)的問題,例如 網(wǎng)絡(luò)故障、服務(wù)臨時不可用 等.
Ps:如果時程序邏輯引起的錯誤,那么即使重試多少次都是沒有用的,但是可以通過配置重試次數(shù)來解決.
實現(xiàn)方式一:基于消息手動確認機制,返回 nack 實現(xiàn)
配置文件
spring: application: name: rabbitmq rabbitmq: host: env-base port: 5672 username: root password: 1111 listener: simple: acknowledge-mode: manual # 手動確認
交換機、隊列、綁定
@Bean("ackExchange") fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE) @Bean("ackQueue") fun ackQueue() = Queue(MQConst.ACK_QUEUE) @Bean fun ackBinding( @Qualifier("ackExchange") exchange: DirectExchange, @Qualifier("ackQueue") queue: Queue, ): Binding { return BindingBuilder .bind(queue) .to(exchange) .with(MQConst.ACK_BINDING) }
生產(chǎn)者接口
@RestController @RequestMapping("/mq3") class MQ3Api( val rabbitTemplate: RabbitTemplate ) { @RequestMapping("/ack") fun ack(): String { rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1") return "ok" } }
消費者
import com.cyk.rabbitmq.constants.MQConst import com.rabbitmq.client.Channel //注意這里的依賴 import org.springframework.amqp.core.Message //注意這里的依賴 import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.stereotype.Component import java.nio.charset.Charset @Component class AckListener { @RabbitListener(queues = [MQConst.ACK_QUEUE]) fun handMessage( message: Message, channel: Channel, ) { val deliveryTag = message.messageProperties.deliveryTag try { println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag") val a = 1 / 0 channel.basicAck(deliveryTag, false) } catch (e: Exception) { //通過返回 nack,并設(shè)置 requeue 為 ture 實現(xiàn)消息重新入隊,并進行重試 channel.basicNack(deliveryTag, false, true) } } }
演示和結(jié)論
deliverTag 自增的原因: 引發(fā)異常后,會返回 nack,并且參數(shù) requeue = true,表示重新入隊,然后進行重試,將隊列中的消息再次發(fā)送給生產(chǎn)者,因此 deliverTag 會自增.
缺點: 如果是由于程序邏輯異常引起的重試
,那么無論重試多少次都沒用,并且不斷重試會導(dǎo)致負載飆升,性能下降
.
實現(xiàn)方式二:基于重試配置實現(xiàn) 配置文件
spring: application: name: rabbitmq rabbitmq: host: env-base port: 5672 username: root password: 1111 listener: simple: acknowledge-mode: auto # 開啟重試機制,這里必須是 auto,否則不生效! retry: enabled: true # 開啟消費者失敗重試 initial-interval: 5000ms # 失敗等待時常 max-attempts: 5 # 最大重試次數(shù)(包括第一次消費)
Ps:開啟重試機制,acknowledge-mode 必須指定為 auto,否則不生效!
交換機、隊列、綁定
@Bean("ackExchange") fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE) @Bean("ackQueue") fun ackQueue() = Queue(MQConst.ACK_QUEUE) @Bean fun ackBinding( @Qualifier("ackExchange") exchange: DirectExchange, @Qualifier("ackQueue") queue: Queue, ): Binding { return BindingBuilder .bind(queue) .to(exchange) .with(MQConst.ACK_BINDING) }
生產(chǎn)者接口
@RequestMapping("/ack") fun ack(): String { rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1") return "ok" }
消費者
import com.cyk.rabbitmq.constants.MQConst import com.rabbitmq.client.Channel //注意這里的依賴 import org.springframework.amqp.core.Message //注意這里的依賴 import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.stereotype.Component import java.nio.charset.Charset @Component class AckListener { @RabbitListener(queues = [MQConst.ACK_QUEUE]) fun handMessage( message: Message, channel: Channel, ) { println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}") val a = 1 / 0 } }
演示和結(jié)論
deliverTag 不自增的原因: 因為是消息已經(jīng)發(fā)出去了,即使失敗了也不會重回隊列,而是直接重新發(fā)一遍消息.
好處: 不僅可以控制重試次數(shù)(防止類似于上面講到的確認應(yīng)答引起的無限重試),還可以控制每次重試的間隔時間(防止負載飆升).
到此這篇關(guān)于Rabbit高級特性 - 消息重試機制的文章就介紹到這了,更多相關(guān)Rabbit消息重試機制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決Spring boot整合mybatis,xml資源文件放置及路徑配置問題
這篇文章主要介紹了解決Spring boot整合mybatis,xml資源文件放置及路徑配置問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12MyBatis實現(xiàn)表連接查詢寫法(三種對應(yīng)關(guān)系)的方法總結(jié)
這篇文章主要介紹了MyBatis實現(xiàn)表連接查詢寫法(一對一關(guān)系、一對多關(guān)系、多對多關(guān)系)的方法,文中的示例代碼講解詳細,感興趣的可以了解一下2023-01-01SpringBoot?將配置文件掛到?jar?包外面的操作方法
在 SpringBoot 中,可以將配置文件放在 jar 包外面,這樣可以方便地修改配置而不需要重新打包和部署,這篇文章主要介紹了SpringBoot?如何將配置文件掛到?jar?包外面,需要的朋友可以參考下2023-03-03Spring中的InitializingBean接口源碼解析
這篇文章主要介紹了Spring中的InitializingBean接口源碼解析,InitializingBean接口為Bean初始化提供了一種方式,實現(xiàn)InitializingBean接口的Bean,在BeanFactory設(shè)置其所有屬性后會調(diào)用其afterPropertiesSet()方法,需要的朋友可以參考下2024-02-02Mybatis-Plus之ID自動增長的設(shè)置實現(xiàn)
本文主要介紹了Mybatis-Plus之ID自動增長的設(shè)置實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07mybatis之嵌套查詢和嵌套結(jié)果有哪些區(qū)別
這篇文章主要介紹了mybatis之嵌套查詢和嵌套結(jié)果有哪些區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03