欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

淺談spring-kafka消費(fèi)異常處理

 更新時(shí)間:2025年09月17日 08:25:39   作者:若魚(yú)1919  
本文主要介紹了spring-kafka消費(fèi)異常處理,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

默認(rèn)的消費(fèi)異常處理

默認(rèn)情況下,如果程序沒(méi)有顯式做任何的異常處理,spring-kafka會(huì)提供一個(gè)默認(rèn)的DefaultErrorHandler, 它會(huì)使用FixedBackOff做重試,會(huì)不間斷的連續(xù)重試最多9次,也就是說(shuō)一個(gè)消息最多會(huì)被消費(fèi)10次。如果重試次數(shù)耗盡,最終會(huì)在控制臺(tái)打印異常,并且會(huì)提交offset,也就是說(shuō)這條消息就被丟棄了。
舉個(gè)例子:
發(fā)消息

@GetMapping("send/{msg}")
    public String send(@PathVariable("msg")String msg){
        CompletableFuture future = kafkaTemplate.send("test-topic", msg);
        try{
            future.get();
            log.info("消息發(fā)送成功");
        }catch(Exception e){
            e.printStackTrace();
        }
        return "OK";
    }

收消息

@Component
public class DemoListener {

    private static Logger log = LoggerFactory.getLogger(DemoListener.class);

    @KafkaListener(topics = {"test-topic"})
    public void onMessage(ConsumerRecord record){
        Object value = record.value();
        log.info("收到消息:{}", value);
        throw new RuntimeException("manually throw");
    }
}

kafka的配置

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka服務(wù)器地址
    consumer:
      group-id: my-group  # 默認(rèn)的消費(fèi)者組ID
      auto-offset-reset: earliest  # 如果沒(méi)有初始偏移量或偏移量已失效,從最早的消息開(kāi)始讀取
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

現(xiàn)在發(fā)一條消息做測(cè)試,控制臺(tái)輸出如下:

2025-09-14T10:26:27.508+08:00  INFO 5912 --- [nio-8080-exec-1] c.g.xjs.kafka.controller.DemoController  : 消息發(fā)送成功
2025-09-14T10:26:27.509+08:00  INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
......
2025-09-14T10:26:31.666+08:00  INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:26:31.680+08:00  INFO 5912 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 6 for partition test-topic-0
2025-09-14T10:26:31.680+08:00  INFO 5912 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T10:26:32.174+08:00  INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:26:32.182+08:00 ERROR 5912 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for test-topic-0@6

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.github.xjs.kafka.listener.DemoListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord)' threw exception
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2996) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1527) ~[spring-kafka-3.3.3.jar:3.3.3]

自定義重試邏輯

我們可以自定義一個(gè)DefaultErrorHandler的bean來(lái)自定義重試邏輯,比如:

@Bean
public DefaultErrorHandler errorHandler(){
    ExponentialBackOff backOff = new ExponentialBackOff();
    // 最大的重試間隔,默認(rèn)是30秒
    backOff.setMaxInterval(30000);
    // 初始的重試間隔,默認(rèn)是2秒
    backOff.setInitialInterval(3000);
    // 間隔倍數(shù),下一次間隔 = 當(dāng)前間隔 * 間隔倍數(shù),默認(rèn)是1.5
    backOff.setMultiplier(3);
    // 最大重試次數(shù), 默認(rèn)無(wú)限制重試,如果按照默認(rèn)配置,首次重試隔2秒,下一次隔(2*1.5)3秒,以此類(lèi)推
    backOff.setMaxAttempts(2);
    return new DefaultErrorHandler(null,backOff);
}

現(xiàn)在重新發(fā)一條消息,控制臺(tái)輸出:

2025-09-14T10:42:32.069+08:00  INFO 1288 --- [nio-8080-exec-1] c.g.xjs.kafka.controller.DemoController  : 消息發(fā)送成功
2025-09-14T10:42:32.070+08:00  INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:42:35.128+08:00  INFO 1288 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 8 for partition test-topic-0
2025-09-14T10:42:35.129+08:00  INFO 1288 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T10:42:35.131+08:00  INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:42:44.193+08:00  INFO 1288 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 8 for partition test-topic-0
2025-09-14T10:42:44.193+08:00  INFO 1288 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T10:42:44.195+08:00  INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:42:44.199+08:00 ERROR 1288 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff ExponentialBackOffExecution{currentInterval=9000ms, multiplier=3.0, attempts=2} exhausted for test-topic-0@8

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.github.xjs.kafka.listener.DemoListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord)' threw exception
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2996) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1527) ~[spring-kafka-3.3.3.jar:3.3.3]

可以看到,消息總共被接受了3次,包含了2次重試,第一次是在3每秒以后,第二次是在9秒以后。
除了ExponentialBackOff 之外,常見(jiàn)的還有ExponentialBackOffWithMaxRetriesFixedBackOff,當(dāng)然也可以自定義。
ExponentialBackOff 默認(rèn)無(wú)限重試,默認(rèn)的最大重試間隔是30秒,如果超過(guò)了30秒則按30秒算。
ExponentialBackOffWithMaxRetries可以設(shè)置最大的重試次數(shù)。
FixedBackOff是固定時(shí)間間隔,默認(rèn)是5秒,默認(rèn)沒(méi)有重試次數(shù)限制。

隊(duì)頭阻塞與消息丟失問(wèn)題

上面介紹的異常處理方式存在2個(gè)非常嚴(yán)重的問(wèn)題,一個(gè)是隊(duì)頭阻塞問(wèn)題,另一個(gè)是消息丟失問(wèn)題。所謂的隊(duì)頭阻塞問(wèn)題,就是說(shuō)當(dāng)一條消息在進(jìn)行重試的時(shí)候,就算topic中有了新的消息,消費(fèi)者也無(wú)法消費(fèi)到,因?yàn)橄M(fèi)者線(xiàn)程會(huì)以阻塞的方式進(jìn)行重試,重試結(jié)束以后才可以繼續(xù)后面消息的消費(fèi),如果重試時(shí)間很長(zhǎng)就會(huì)導(dǎo)致后面的消息長(zhǎng)時(shí)間得不到消費(fèi)。消息丟失就很好理解了,重試次數(shù)耗盡以后,僅僅是打印一條錯(cuò)誤的日志,更好的處理辦法是把錯(cuò)誤的消息發(fā)送給死信Topic,然后交由人工進(jìn)行后續(xù)處理。接下來(lái)先來(lái)處理下消息丟失的問(wèn)題。

死信Topic

在構(gòu)造DefaultErrorHandler的時(shí)候,還有一個(gè)參數(shù)是ConsumerRecordRecoverer,如果我們提供了這個(gè)recover,那么重試次數(shù)耗盡以后,消息會(huì)被傳遞給這個(gè)recover,我們就可以把消費(fèi)失敗的消息重新投遞到DLT中。
幸運(yùn)的是,spring-kafka已經(jīng)提供了一個(gè)DeadLetterPublishingRecoverer就可以實(shí)現(xiàn)這個(gè)功能。
下面我們重寫(xiě)下DefaultErrorHandler :

@Bean
 public DefaultErrorHandler errorHandler(KafkaTemplate kafkaTemplate){
     var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
             (cr, e)-> new TopicPartition(cr.topic()+".DLT", cr.partition()));
     ExponentialBackOff backOff = new ExponentialBackOff();
     backOff.setMaxInterval(30000);
     backOff.setInitialInterval(3000);
     backOff.setMultiplier(3);
     backOff.setMaxAttempts(2);
     return new DefaultErrorHandler(recoverer,backOff);
 }

在構(gòu)造DeadLetterPublishingRecoverer的時(shí)候,需要用到KafkaTemplate ,同時(shí)我們需要設(shè)置DLT的topic和partition。
現(xiàn)在我們重新發(fā)一個(gè)消息,控制臺(tái)的日志:

2025-09-14T11:17:48.532+08:00  INFO 9804 --- [nio-8080-exec-4] c.g.xjs.kafka.controller.DemoController  : 消息發(fā)送成功
2025-09-14T11:17:48.533+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T11:17:51.609+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 10 for partition test-topic-0
2025-09-14T11:17:51.609+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:17:51.611+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T11:18:00.708+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 10 for partition test-topic-0
2025-09-14T11:18:00.708+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:18:00.710+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello

這次就沒(méi)有異常拋出,而且我們可以從DLT中看到消息:

D:\kafka_2.12-3.9.1> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic test-topic.DLT 
hello

非阻塞重試

還是使用上面的代碼,我們連續(xù)發(fā)送2條消息,控制臺(tái)輸出如下:

2025-09-14T11:24:02.837+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:1111
2025-09-14T11:24:03.869+08:00  INFO 9804 --- [nio-8080-exec-8] c.g.xjs.kafka.controller.DemoController  : 消息發(fā)送成功
2025-09-14T11:24:05.914+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 11 for partition test-topic-0
2025-09-14T11:24:05.914+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:24:05.915+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:1111
2025-09-14T11:24:14.963+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 11 for partition test-topic-0
2025-09-14T11:24:14.963+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:24:14.965+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:1111
2025-09-14T11:24:15.470+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:15.473+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:2222
2025-09-14T11:24:18.553+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:18.553+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:24:18.554+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:2222
2025-09-14T11:24:27.609+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:27.609+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:24:27.611+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:2222
2025-09-14T11:24:28.635+08:00  INFO 9804 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Node -1 disconnected.
2025-09-14T11:24:58.128+08:00  INFO 9804 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.

可以看出來(lái),雖然消息是同時(shí)發(fā)出的,但是第一條消息重試期間,第二條消息是無(wú)法得到消費(fèi)的。只有第一條消息的重試次數(shù)耗盡以后,第二條消息才有機(jī)會(huì)被消費(fèi)。如果重試時(shí)間間隔和次數(shù)都比較大,這種阻塞式的重試就不合適了。

下面我們來(lái)看下如何使用非阻塞重試:

@Configuration
@EnableKafkaRetryTopic //non-blocking:1
public class KafkaConfiguration {
    // non-blocking:2
    @Bean
    TaskScheduler scheduler() {
        return new ThreadPoolTaskScheduler();
    }
    // non-blocking:3
    @Bean
    public RetryTopicConfiguration myRetryConfiguration(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .exponentialBackoff(3000, 10, Long.MAX_VALUE)
                .maxAttempts(3)
                .dltSuffix(".DLT")
                .create(template);
    }

}
  • 首先添加@EnableKafkaRetryTopic 注解
  • 然后提供一個(gè)TaskScheduler 的實(shí)例
  • 最后提供RetryTopicConfiguration 的實(shí)例

現(xiàn)在重啟應(yīng)用,連續(xù)發(fā)送2個(gè)消息再次觀(guān)察控制臺(tái)輸出:

2025-09-14T11:44:40.303+08:00  INFO 5380 --- [nio-8080-exec-8] c.g.xjs.kafka.controller.DemoController  : 消息發(fā)送成功
2025-09-14T11:44:40.304+08:00  INFO 5380 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:3333
2025-09-14T11:44:40.817+08:00  INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 3 for partition test-topic-retry-3000-0
2025-09-14T11:44:40.817+08:00  INFO 5380 --- [etry-3000-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:44:41.284+08:00  INFO 5380 --- [nio-8080-exec-5] c.g.xjs.kafka.controller.DemoController  : 消息發(fā)送成功
2025-09-14T11:44:41.284+08:00  INFO 5380 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:4444
2025-09-14T11:44:43.316+08:00  INFO 5380 --- [etry-3000-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:3333
2025-09-14T11:44:43.826+08:00  INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 4 for partition test-topic-retry-3000-0
2025-09-14T11:44:43.826+08:00  INFO 5380 --- [try-30000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 3 for partition test-topic-retry-30000-0
2025-09-14T11:44:43.826+08:00  INFO 5380 --- [try-30000-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:44:43.828+08:00  INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 4 for partition test-topic-retry-3000-0
2025-09-14T11:44:43.828+08:00  INFO 5380 --- [etry-3000-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:44:44.332+08:00  INFO 5380 --- [etry-3000-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:4444
2025-09-14T11:45:13.334+08:00  INFO 5380 --- [try-30000-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:3333
2025-09-14T11:45:13.334+08:00 ERROR 5380 --- [try-30000-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = test-topic-retry-30000, partition = 0, offset = 3, main topic = test-topic threw an error at topic test-topic-retry-30000 and won't be retried. Sending to DLT with name test-topic.DLT.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:3000) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]
	at

可以看到再也不會(huì)存在隊(duì)頭阻塞問(wèn)題,并且消息也成功投遞到了DLT中:

D:\kafka_2.12-3.9.1> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic test-topic.DLT 
3333
4444

非阻塞重試的原理

我們查看下kafka中的topic列表:

D:\kafka_2.12-3.9.1> .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic
test-topic-retry-3000
test-topic-retry-30000
test-topic.DLT

此時(shí)會(huì)發(fā)現(xiàn)多出來(lái)2個(gè)帶retry的topic:test-topic-retry-3000 和 test-topic-retry-30000。

如果消息處理失敗,該消息會(huì)被轉(zhuǎn)發(fā)到一個(gè)retry的topic。消費(fèi)者會(huì)檢查時(shí)間戳,如果尚未到達(dá)重試時(shí)間,則會(huì)暫停該主題分區(qū)的消費(fèi)。當(dāng)?shù)竭_(dá)重試時(shí)間時(shí),分區(qū)消費(fèi)會(huì)恢復(fù),消息會(huì)被再次消費(fèi)。這也是為啥我們要配置一個(gè)TaskScheduler的原因。如果消息處理再次失敗,消息將被轉(zhuǎn)發(fā)到下一個(gè)重試主題,重復(fù)此模式直到處理成功,或者重試次數(shù)用盡,最后消息被發(fā)送到DLT。

以我們的案例來(lái)說(shuō),采用初始3秒的指數(shù)退避策略,乘數(shù)為10,最大重試3-1=2次,系統(tǒng)將自動(dòng)創(chuàng)建test-topic-retry-3000和test-topic-retry-30000和test-topic.DLT。

參考:https://docs.spring.io/spring-kafka/reference/index.html

到此這篇關(guān)于spring-kafka消費(fèi)異常處理的文章就介紹到這了,更多相關(guān)spring-kafka消費(fèi)異常內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot集成ELK實(shí)現(xiàn)數(shù)據(jù)存儲(chǔ)和日志管理的代碼示例

    SpringBoot集成ELK實(shí)現(xiàn)數(shù)據(jù)存儲(chǔ)和日志管理的代碼示例

    本文介紹在SpringBoot 2.7.18中集成Elasticsearch、Logstash、Kibana的步驟,簡(jiǎn)單展示了ES增刪改查的API用法,測(cè)試Logstash日志收集,并實(shí)現(xiàn)Kibana數(shù)據(jù)看板可視化分析日志,需要的朋友可以參考下
    2025-09-09
  • Spring boot + LayIM + t-io 實(shí)現(xiàn)文件上傳、 監(jiān)聽(tīng)用戶(hù)狀態(tài)的實(shí)例代碼

    Spring boot + LayIM + t-io 實(shí)現(xiàn)文件上傳、 監(jiān)聽(tīng)用戶(hù)狀態(tài)的實(shí)例代碼

    這篇文章主要介紹了Spring boot + LayIM + t-io 實(shí)現(xiàn)文件上傳、 監(jiān)聽(tīng)用戶(hù)狀態(tài)的實(shí)例代碼,需要的朋友可以參考下
    2017-12-12
  • mybatis 逆向生成后遵循java駝峰法則的解決

    mybatis 逆向生成后遵循java駝峰法則的解決

    這篇文章主要介紹了mybatis 逆向生成后遵循java駝峰法則的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-11-11
  • IDEA中Spring項(xiàng)目的工程構(gòu)建

    IDEA中Spring項(xiàng)目的工程構(gòu)建

    這篇文章主要介紹了IDEA中Spring項(xiàng)目的工程構(gòu)建,Spring框架是輕量級(jí)的JavaEE框架,可以解決企業(yè)應(yīng)用開(kāi)發(fā)的復(fù)雜性,有兩個(gè)核心部分:IOC和Aop,今天來(lái)學(xué)習(xí)如何構(gòu)建spring項(xiàng)目,需要的朋友可以參考下
    2023-05-05
  • Java漢字轉(zhuǎn)換拼音(大小寫(xiě))實(shí)例詳解

    Java漢字轉(zhuǎn)換拼音(大小寫(xiě))實(shí)例詳解

    這篇文章主要給大家介紹了關(guān)于Java漢字轉(zhuǎn)換拼音(大小寫(xiě))的相關(guān)資料,包括使用java.text.Normalizer類(lèi)和正則表達(dá)式進(jìn)行大小寫(xiě)轉(zhuǎn)換,通過(guò)示例代碼和場(chǎng)景說(shuō)明,幫助讀者在實(shí)際項(xiàng)目中實(shí)現(xiàn)漢字與拼音的轉(zhuǎn)換,需要的朋友可以參考下
    2025-05-05
  • java生成在線(xiàn)驗(yàn)證碼

    java生成在線(xiàn)驗(yàn)證碼

    這篇文章主要介紹了java生成在線(xiàn)驗(yàn)證碼,需要的朋友可以參考下
    2023-10-10
  • 通過(guò)實(shí)例解析java過(guò)濾器和攔截器的區(qū)別

    通過(guò)實(shí)例解析java過(guò)濾器和攔截器的區(qū)別

    這篇文章主要介紹了通過(guò)實(shí)例解析java過(guò)濾器和攔截器的區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • JAVA正則表達(dá)式過(guò)濾文件的實(shí)現(xiàn)方法

    JAVA正則表達(dá)式過(guò)濾文件的實(shí)現(xiàn)方法

    這篇文章主要介紹了JAVA正則表達(dá)式過(guò)濾文件的實(shí)現(xiàn)方法的相關(guān)資料,希望通過(guò)本文大家能夠掌握理解這部分內(nèi)容,需要的朋友可以參考下
    2017-09-09
  • Mybatis如何獲取insert新增數(shù)據(jù)id值

    Mybatis如何獲取insert新增數(shù)據(jù)id值

    這篇文章主要介紹了Mybatis如何獲取insert新增數(shù)據(jù)id值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • 關(guān)于Unsupported Media Type的解決方案

    關(guān)于Unsupported Media Type的解決方案

    在Web開(kāi)發(fā)中,415錯(cuò)誤表示服務(wù)器無(wú)法處理請(qǐng)求附帶的媒體格式,本文介紹了導(dǎo)致HTTP 415錯(cuò)誤的原因以及解決該問(wèn)題的兩種方法,首先,415錯(cuò)誤通常是由于客戶(hù)端請(qǐng)求的內(nèi)容類(lèi)型與服務(wù)器期望的不匹配引起的,例如,服務(wù)器可能期望JSON格式的數(shù)據(jù)
    2024-10-10

最新評(píng)論