欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Rabbit消息重試機(jī)制問(wèn)題記錄

 更新時(shí)間:2024年08月09日 10:21:25   作者:陳亦康  
消息重試機(jī)制就是在消息處理失敗之后重新發(fā)送,主要時(shí)為了解決消息發(fā)送過(guò)程可能會(huì)出現(xiàn)的問(wèn)題,例如 網(wǎng)絡(luò)故障、服務(wù)臨時(shí)不可用 等,這篇文章主要介紹了Rabbit消息重試機(jī)制問(wèn)題記錄,需要的朋友可以參考下

消息重試機(jī)制

概述

消息重試機(jī)制就是在消息處理失敗之后重新發(fā)送,主要時(shí)為了解決消息發(fā)送過(guò)程可能會(huì)出現(xiàn)的問(wèn)題,例如 網(wǎng)絡(luò)故障、服務(wù)臨時(shí)不可用 等.

Ps:如果時(shí)程序邏輯引起的錯(cuò)誤,那么即使重試多少次都是沒(méi)有用的,但是可以通過(guò)配置重試次數(shù)來(lái)解決.

實(shí)現(xiàn)方式一:基于消息手動(dòng)確認(rèn)機(jī)制,返回 nack 實(shí)現(xiàn)

配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手動(dòng)確認(rèn)

交換機(jī)、隊(duì)列、綁定

    @Bean("ackExchange")
    fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE)
    @Bean("ackQueue")
    fun ackQueue() = Queue(MQConst.ACK_QUEUE)
    @Bean
    fun ackBinding(
        @Qualifier("ackExchange") exchange: DirectExchange,
        @Qualifier("ackQueue") queue: Queue,
    ): Binding {
        return BindingBuilder
            .bind(queue)
            .to(exchange)
            .with(MQConst.ACK_BINDING)
    }

生產(chǎn)者接口

@RestController
@RequestMapping("/mq3")
class MQ3Api(
   val rabbitTemplate: RabbitTemplate
) {
    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }
}

消費(fèi)者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意這里的依賴(lài)
import org.springframework.amqp.core.Message //注意這里的依賴(lài)
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        val deliveryTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
            val a = 1 / 0
            channel.basicAck(deliveryTag, false)
        } catch (e: Exception) {
            //通過(guò)返回 nack,并設(shè)置 requeue 為 ture 實(shí)現(xiàn)消息重新入隊(duì),并進(jìn)行重試
            channel.basicNack(deliveryTag, false, true) 
        }
    }
}

演示和結(jié)論

deliverTag 自增的原因: 引發(fā)異常后,會(huì)返回 nack,并且參數(shù) requeue = true,表示重新入隊(duì),然后進(jìn)行重試,將隊(duì)列中的消息再次發(fā)送給生產(chǎn)者,因此 deliverTag 會(huì)自增.

缺點(diǎn): 如果是由于程序邏輯異常引起的重試,那么無(wú)論重試多少次都沒(méi)用,并且不斷重試會(huì)導(dǎo)致負(fù)載飆升,性能下降.

實(shí)現(xiàn)方式二:基于重試配置實(shí)現(xiàn) 配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: auto # 開(kāi)啟重試機(jī)制,這里必須是 auto,否則不生效!
        retry:
          enabled: true # 開(kāi)啟消費(fèi)者失敗重試
          initial-interval: 5000ms # 失敗等待時(shí)常
          max-attempts: 5 # 最大重試次數(shù)(包括第一次消費(fèi))

Ps:開(kāi)啟重試機(jī)制,acknowledge-mode 必須指定為 auto,否則不生效!

交換機(jī)、隊(duì)列、綁定

    @Bean("ackExchange")
    fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE)
    @Bean("ackQueue")
    fun ackQueue() = Queue(MQConst.ACK_QUEUE)
    @Bean
    fun ackBinding(
        @Qualifier("ackExchange") exchange: DirectExchange,
        @Qualifier("ackQueue") queue: Queue,
    ): Binding {
        return BindingBuilder
            .bind(queue)
            .to(exchange)
            .with(MQConst.ACK_BINDING)
    }

生產(chǎn)者接口

    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }

消費(fèi)者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意這里的依賴(lài)
import org.springframework.amqp.core.Message //注意這里的依賴(lài)
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
        val a = 1 / 0
    }
}

演示和結(jié)論

deliverTag 不自增的原因: 因?yàn)槭窍⒁呀?jīng)發(fā)出去了,即使失敗了也不會(huì)重回隊(duì)列,而是直接重新發(fā)一遍消息.

好處: 不僅可以控制重試次數(shù)(防止類(lèi)似于上面講到的確認(rèn)應(yīng)答引起的無(wú)限重試),還可以控制每次重試的間隔時(shí)間(防止負(fù)載飆升).

到此這篇關(guān)于Rabbit高級(jí)特性 - 消息重試機(jī)制的文章就介紹到這了,更多相關(guān)Rabbit消息重試機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論