Rabbit消息重試機(jī)制問(wèn)題記錄
消息重試機(jī)制
概述
消息重試機(jī)制就是在消息處理失敗之后重新發(fā)送,主要時(shí)為了解決消息發(fā)送過(guò)程可能會(huì)出現(xiàn)的問(wèn)題,例如 網(wǎng)絡(luò)故障、服務(wù)臨時(shí)不可用 等.
Ps:如果時(shí)程序邏輯引起的錯(cuò)誤,那么即使重試多少次都是沒(méi)有用的,但是可以通過(guò)配置重試次數(shù)來(lái)解決.
實(shí)現(xiàn)方式一:基于消息手動(dòng)確認(rèn)機(jī)制,返回 nack 實(shí)現(xiàn)
配置文件
spring: application: name: rabbitmq rabbitmq: host: env-base port: 5672 username: root password: 1111 listener: simple: acknowledge-mode: manual # 手動(dòng)確認(rèn)
交換機(jī)、隊(duì)列、綁定
@Bean("ackExchange") fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE) @Bean("ackQueue") fun ackQueue() = Queue(MQConst.ACK_QUEUE) @Bean fun ackBinding( @Qualifier("ackExchange") exchange: DirectExchange, @Qualifier("ackQueue") queue: Queue, ): Binding { return BindingBuilder .bind(queue) .to(exchange) .with(MQConst.ACK_BINDING) }
生產(chǎn)者接口
@RestController @RequestMapping("/mq3") class MQ3Api( val rabbitTemplate: RabbitTemplate ) { @RequestMapping("/ack") fun ack(): String { rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1") return "ok" } }
消費(fèi)者
import com.cyk.rabbitmq.constants.MQConst import com.rabbitmq.client.Channel //注意這里的依賴(lài) import org.springframework.amqp.core.Message //注意這里的依賴(lài) import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.stereotype.Component import java.nio.charset.Charset @Component class AckListener { @RabbitListener(queues = [MQConst.ACK_QUEUE]) fun handMessage( message: Message, channel: Channel, ) { val deliveryTag = message.messageProperties.deliveryTag try { println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag") val a = 1 / 0 channel.basicAck(deliveryTag, false) } catch (e: Exception) { //通過(guò)返回 nack,并設(shè)置 requeue 為 ture 實(shí)現(xiàn)消息重新入隊(duì),并進(jìn)行重試 channel.basicNack(deliveryTag, false, true) } } }
演示和結(jié)論
deliverTag 自增的原因: 引發(fā)異常后,會(huì)返回 nack,并且參數(shù) requeue = true,表示重新入隊(duì),然后進(jìn)行重試,將隊(duì)列中的消息再次發(fā)送給生產(chǎn)者,因此 deliverTag 會(huì)自增.
缺點(diǎn): 如果是由于程序邏輯異常引起的重試
,那么無(wú)論重試多少次都沒(méi)用,并且不斷重試會(huì)導(dǎo)致負(fù)載飆升,性能下降
.
實(shí)現(xiàn)方式二:基于重試配置實(shí)現(xiàn) 配置文件
spring: application: name: rabbitmq rabbitmq: host: env-base port: 5672 username: root password: 1111 listener: simple: acknowledge-mode: auto # 開(kāi)啟重試機(jī)制,這里必須是 auto,否則不生效! retry: enabled: true # 開(kāi)啟消費(fèi)者失敗重試 initial-interval: 5000ms # 失敗等待時(shí)常 max-attempts: 5 # 最大重試次數(shù)(包括第一次消費(fèi))
Ps:開(kāi)啟重試機(jī)制,acknowledge-mode 必須指定為 auto,否則不生效!
交換機(jī)、隊(duì)列、綁定
@Bean("ackExchange") fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE) @Bean("ackQueue") fun ackQueue() = Queue(MQConst.ACK_QUEUE) @Bean fun ackBinding( @Qualifier("ackExchange") exchange: DirectExchange, @Qualifier("ackQueue") queue: Queue, ): Binding { return BindingBuilder .bind(queue) .to(exchange) .with(MQConst.ACK_BINDING) }
生產(chǎn)者接口
@RequestMapping("/ack") fun ack(): String { rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1") return "ok" }
消費(fèi)者
import com.cyk.rabbitmq.constants.MQConst import com.rabbitmq.client.Channel //注意這里的依賴(lài) import org.springframework.amqp.core.Message //注意這里的依賴(lài) import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.stereotype.Component import java.nio.charset.Charset @Component class AckListener { @RabbitListener(queues = [MQConst.ACK_QUEUE]) fun handMessage( message: Message, channel: Channel, ) { println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}") val a = 1 / 0 } }
演示和結(jié)論
deliverTag 不自增的原因: 因?yàn)槭窍⒁呀?jīng)發(fā)出去了,即使失敗了也不會(huì)重回隊(duì)列,而是直接重新發(fā)一遍消息.
好處: 不僅可以控制重試次數(shù)(防止類(lèi)似于上面講到的確認(rèn)應(yīng)答引起的無(wú)限重試),還可以控制每次重試的間隔時(shí)間(防止負(fù)載飆升).
到此這篇關(guān)于Rabbit高級(jí)特性 - 消息重試機(jī)制的文章就介紹到這了,更多相關(guān)Rabbit消息重試機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Javaweb實(shí)現(xiàn)上傳下載文件的多種方法
本篇文章主要介紹了Javaweb實(shí)現(xiàn)上傳下載文件,有多種實(shí)現(xiàn)方式,需要的朋友可以參考下。2016-10-10如何基于java實(shí)現(xiàn)解壓ZIP TAR等文件
這篇文章主要介紹了如何基于java實(shí)現(xiàn)解壓ZIP TAR等文件,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07解決Spring boot整合mybatis,xml資源文件放置及路徑配置問(wèn)題
這篇文章主要介紹了解決Spring boot整合mybatis,xml資源文件放置及路徑配置問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12MyBatis實(shí)現(xiàn)表連接查詢(xún)寫(xiě)法(三種對(duì)應(yīng)關(guān)系)的方法總結(jié)
這篇文章主要介紹了MyBatis實(shí)現(xiàn)表連接查詢(xún)寫(xiě)法(一對(duì)一關(guān)系、一對(duì)多關(guān)系、多對(duì)多關(guān)系)的方法,文中的示例代碼講解詳細(xì),感興趣的可以了解一下2023-01-01SpringBoot?將配置文件掛到?jar?包外面的操作方法
在 SpringBoot 中,可以將配置文件放在 jar 包外面,這樣可以方便地修改配置而不需要重新打包和部署,這篇文章主要介紹了SpringBoot?如何將配置文件掛到?jar?包外面,需要的朋友可以參考下2023-03-03Spring中的InitializingBean接口源碼解析
這篇文章主要介紹了Spring中的InitializingBean接口源碼解析,InitializingBean接口為Bean初始化提供了一種方式,實(shí)現(xiàn)InitializingBean接口的Bean,在BeanFactory設(shè)置其所有屬性后會(huì)調(diào)用其afterPropertiesSet()方法,需要的朋友可以參考下2024-02-02Mybatis-Plus之ID自動(dòng)增長(zhǎng)的設(shè)置實(shí)現(xiàn)
本文主要介紹了Mybatis-Plus之ID自動(dòng)增長(zhǎng)的設(shè)置實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Java pom.xml parent引用報(bào)錯(cuò)問(wèn)題解決方案
這篇文章主要介紹了Java pom.xml parent引用報(bào)錯(cuò)問(wèn)題解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08mybatis之嵌套查詢(xún)和嵌套結(jié)果有哪些區(qū)別
這篇文章主要介紹了mybatis之嵌套查詢(xún)和嵌套結(jié)果有哪些區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03