淺談spring-kafka消費異常處理
默認的消費異常處理
默認情況下,如果程序沒有顯式做任何的異常處理,spring-kafka會提供一個默認的DefaultErrorHandler
, 它會使用FixedBackOff
做重試,會不間斷的連續(xù)重試最多9次,也就是說一個消息最多會被消費10次。如果重試次數耗盡,最終會在控制臺打印異常,并且會提交offset,也就是說這條消息就被丟棄了。
舉個例子:
發(fā)消息
@GetMapping("send/{msg}") public String send(@PathVariable("msg")String msg){ CompletableFuture future = kafkaTemplate.send("test-topic", msg); try{ future.get(); log.info("消息發(fā)送成功"); }catch(Exception e){ e.printStackTrace(); } return "OK"; }
收消息
@Component public class DemoListener { private static Logger log = LoggerFactory.getLogger(DemoListener.class); @KafkaListener(topics = {"test-topic"}) public void onMessage(ConsumerRecord record){ Object value = record.value(); log.info("收到消息:{}", value); throw new RuntimeException("manually throw"); } }
kafka的配置
spring: kafka: bootstrap-servers: localhost:9092 # Kafka服務器地址 consumer: group-id: my-group # 默認的消費者組ID auto-offset-reset: earliest # 如果沒有初始偏移量或偏移量已失效,從最早的消息開始讀取 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
現在發(fā)一條消息做測試,控制臺輸出如下:
2025-09-14T10:26:27.508+08:00 INFO 5912 --- [nio-8080-exec-1] c.g.xjs.kafka.controller.DemoController : 消息發(fā)送成功 2025-09-14T10:26:27.509+08:00 INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello ...... 2025-09-14T10:26:31.666+08:00 INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello 2025-09-14T10:26:31.680+08:00 INFO 5912 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 6 for partition test-topic-0 2025-09-14T10:26:31.680+08:00 INFO 5912 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T10:26:32.174+08:00 INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello 2025-09-14T10:26:32.182+08:00 ERROR 5912 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for test-topic-0@6 org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.github.xjs.kafka.listener.DemoListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord)' threw exception at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2996) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1527) ~[spring-kafka-3.3.3.jar:3.3.3]
自定義重試邏輯
我們可以自定義一個DefaultErrorHandler
的bean來自定義重試邏輯,比如:
@Bean public DefaultErrorHandler errorHandler(){ ExponentialBackOff backOff = new ExponentialBackOff(); // 最大的重試間隔,默認是30秒 backOff.setMaxInterval(30000); // 初始的重試間隔,默認是2秒 backOff.setInitialInterval(3000); // 間隔倍數,下一次間隔 = 當前間隔 * 間隔倍數,默認是1.5 backOff.setMultiplier(3); // 最大重試次數, 默認無限制重試,如果按照默認配置,首次重試隔2秒,下一次隔(2*1.5)3秒,以此類推 backOff.setMaxAttempts(2); return new DefaultErrorHandler(null,backOff); }
現在重新發(fā)一條消息,控制臺輸出:
2025-09-14T10:42:32.069+08:00 INFO 1288 --- [nio-8080-exec-1] c.g.xjs.kafka.controller.DemoController : 消息發(fā)送成功 2025-09-14T10:42:32.070+08:00 INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello 2025-09-14T10:42:35.128+08:00 INFO 1288 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 8 for partition test-topic-0 2025-09-14T10:42:35.129+08:00 INFO 1288 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T10:42:35.131+08:00 INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello 2025-09-14T10:42:44.193+08:00 INFO 1288 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 8 for partition test-topic-0 2025-09-14T10:42:44.193+08:00 INFO 1288 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T10:42:44.195+08:00 INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello 2025-09-14T10:42:44.199+08:00 ERROR 1288 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff ExponentialBackOffExecution{currentInterval=9000ms, multiplier=3.0, attempts=2} exhausted for test-topic-0@8 org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.github.xjs.kafka.listener.DemoListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord)' threw exception at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2996) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1527) ~[spring-kafka-3.3.3.jar:3.3.3]
可以看到,消息總共被接受了3次,包含了2次重試,第一次是在3每秒以后,第二次是在9秒以后。
除了ExponentialBackOff
之外,常見的還有ExponentialBackOffWithMaxRetries
和FixedBackOff
,當然也可以自定義。ExponentialBackOff
默認無限重試,默認的最大重試間隔是30秒,如果超過了30秒則按30秒算。ExponentialBackOffWithMaxRetries
可以設置最大的重試次數。FixedBackOff
是固定時間間隔,默認是5秒,默認沒有重試次數限制。
隊頭阻塞與消息丟失問題
上面介紹的異常處理方式存在2個非常嚴重的問題,一個是隊頭阻塞問題,另一個是消息丟失問題。所謂的隊頭阻塞問題,就是說當一條消息在進行重試的時候,就算topic中有了新的消息,消費者也無法消費到,因為消費者線程會以阻塞的方式進行重試,重試結束以后才可以繼續(xù)后面消息的消費,如果重試時間很長就會導致后面的消息長時間得不到消費。消息丟失就很好理解了,重試次數耗盡以后,僅僅是打印一條錯誤的日志,更好的處理辦法是把錯誤的消息發(fā)送給死信Topic,然后交由人工進行后續(xù)處理。接下來先來處理下消息丟失的問題。
死信Topic
在構造DefaultErrorHandler
的時候,還有一個參數是ConsumerRecordRecoverer
,如果我們提供了這個recover,那么重試次數耗盡以后,消息會被傳遞給這個recover,我們就可以把消費失敗的消息重新投遞到DLT中。
幸運的是,spring-kafka已經提供了一個DeadLetterPublishingRecoverer
就可以實現這個功能。
下面我們重寫下DefaultErrorHandler
:
@Bean public DefaultErrorHandler errorHandler(KafkaTemplate kafkaTemplate){ var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, (cr, e)-> new TopicPartition(cr.topic()+".DLT", cr.partition())); ExponentialBackOff backOff = new ExponentialBackOff(); backOff.setMaxInterval(30000); backOff.setInitialInterval(3000); backOff.setMultiplier(3); backOff.setMaxAttempts(2); return new DefaultErrorHandler(recoverer,backOff); }
在構造DeadLetterPublishingRecoverer
的時候,需要用到KafkaTemplate
,同時我們需要設置DLT的topic和partition。
現在我們重新發(fā)一個消息,控制臺的日志:
2025-09-14T11:17:48.532+08:00 INFO 9804 --- [nio-8080-exec-4] c.g.xjs.kafka.controller.DemoController : 消息發(fā)送成功 2025-09-14T11:17:48.533+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello 2025-09-14T11:17:51.609+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 10 for partition test-topic-0 2025-09-14T11:17:51.609+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:17:51.611+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello 2025-09-14T11:18:00.708+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 10 for partition test-topic-0 2025-09-14T11:18:00.708+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:18:00.710+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:hello
這次就沒有異常拋出,而且我們可以從DLT中看到消息:
D:\kafka_2.12-3.9.1> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic.DLT hello
非阻塞重試
還是使用上面的代碼,我們連續(xù)發(fā)送2條消息,控制臺輸出如下:
2025-09-14T11:24:02.837+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:1111 2025-09-14T11:24:03.869+08:00 INFO 9804 --- [nio-8080-exec-8] c.g.xjs.kafka.controller.DemoController : 消息發(fā)送成功 2025-09-14T11:24:05.914+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 11 for partition test-topic-0 2025-09-14T11:24:05.914+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:24:05.915+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:1111 2025-09-14T11:24:14.963+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 11 for partition test-topic-0 2025-09-14T11:24:14.963+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:24:14.965+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:1111 2025-09-14T11:24:15.470+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0 2025-09-14T11:24:15.473+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:2222 2025-09-14T11:24:18.553+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0 2025-09-14T11:24:18.553+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:24:18.554+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:2222 2025-09-14T11:24:27.609+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0 2025-09-14T11:24:27.609+08:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:24:27.611+08:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:2222 2025-09-14T11:24:28.635+08:00 INFO 9804 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Node -1 disconnected. 2025-09-14T11:24:58.128+08:00 INFO 9804 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
可以看出來,雖然消息是同時發(fā)出的,但是第一條消息重試期間,第二條消息是無法得到消費的。只有第一條消息的重試次數耗盡以后,第二條消息才有機會被消費。如果重試時間間隔和次數都比較大,這種阻塞式的重試就不合適了。
下面我們來看下如何使用非阻塞重試:
@Configuration @EnableKafkaRetryTopic //non-blocking:1 public class KafkaConfiguration { // non-blocking:2 @Bean TaskScheduler scheduler() { return new ThreadPoolTaskScheduler(); } // non-blocking:3 @Bean public RetryTopicConfiguration myRetryConfiguration(KafkaTemplate<String, String> template) { return RetryTopicConfigurationBuilder .newInstance() .exponentialBackoff(3000, 10, Long.MAX_VALUE) .maxAttempts(3) .dltSuffix(".DLT") .create(template); } }
- 首先添加
@EnableKafkaRetryTopic
注解 - 然后提供一個
TaskScheduler
的實例 - 最后提供
RetryTopicConfiguration
的實例
現在重啟應用,連續(xù)發(fā)送2個消息再次觀察控制臺輸出:
2025-09-14T11:44:40.303+08:00 INFO 5380 --- [nio-8080-exec-8] c.g.xjs.kafka.controller.DemoController : 消息發(fā)送成功 2025-09-14T11:44:40.304+08:00 INFO 5380 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:3333 2025-09-14T11:44:40.817+08:00 INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 3 for partition test-topic-retry-3000-0 2025-09-14T11:44:40.817+08:00 INFO 5380 --- [etry-3000-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:44:41.284+08:00 INFO 5380 --- [nio-8080-exec-5] c.g.xjs.kafka.controller.DemoController : 消息發(fā)送成功 2025-09-14T11:44:41.284+08:00 INFO 5380 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:4444 2025-09-14T11:44:43.316+08:00 INFO 5380 --- [etry-3000-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:3333 2025-09-14T11:44:43.826+08:00 INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 4 for partition test-topic-retry-3000-0 2025-09-14T11:44:43.826+08:00 INFO 5380 --- [try-30000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 3 for partition test-topic-retry-30000-0 2025-09-14T11:44:43.826+08:00 INFO 5380 --- [try-30000-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:44:43.828+08:00 INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 4 for partition test-topic-retry-3000-0 2025-09-14T11:44:43.828+08:00 INFO 5380 --- [etry-3000-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered 2025-09-14T11:44:44.332+08:00 INFO 5380 --- [etry-3000-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:4444 2025-09-14T11:45:13.334+08:00 INFO 5380 --- [try-30000-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:3333 2025-09-14T11:45:13.334+08:00 ERROR 5380 --- [try-30000-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = test-topic-retry-30000, partition = 0, offset = 3, main topic = test-topic threw an error at topic test-topic-retry-30000 and won't be retried. Sending to DLT with name test-topic.DLT. org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:3000) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3] at
可以看到再也不會存在隊頭阻塞問題,并且消息也成功投遞到了DLT中:
D:\kafka_2.12-3.9.1> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic.DLT 3333 4444
非阻塞重試的原理
我們查看下kafka中的topic列表:
D:\kafka_2.12-3.9.1> .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092 __consumer_offsets test-topic test-topic-retry-3000 test-topic-retry-30000 test-topic.DLT
此時會發(fā)現多出來2個帶retry的topic:test-topic-retry-3000 和 test-topic-retry-30000。
如果消息處理失敗,該消息會被轉發(fā)到一個retry的topic。消費者會檢查時間戳,如果尚未到達重試時間,則會暫停該主題分區(qū)的消費。當到達重試時間時,分區(qū)消費會恢復,消息會被再次消費。這也是為啥我們要配置一個TaskScheduler
的原因。如果消息處理再次失敗,消息將被轉發(fā)到下一個重試主題,重復此模式直到處理成功,或者重試次數用盡,最后消息被發(fā)送到DLT。
以我們的案例來說,采用初始3秒的指數退避策略,乘數為10,最大重試3-1=2次,系統(tǒng)將自動創(chuàng)建test-topic-retry-3000和test-topic-retry-30000和test-topic.DLT。
參考:https://docs.spring.io/spring-kafka/reference/index.html
到此這篇關于spring-kafka消費異常處理的文章就介紹到這了,更多相關spring-kafka消費異常內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot集成ELK實現數據存儲和日志管理的代碼示例
本文介紹在SpringBoot 2.7.18中集成Elasticsearch、Logstash、Kibana的步驟,簡單展示了ES增刪改查的API用法,測試Logstash日志收集,并實現Kibana數據看板可視化分析日志,需要的朋友可以參考下2025-09-09Spring boot + LayIM + t-io 實現文件上傳、 監(jiān)聽用戶狀態(tài)的實例代碼
這篇文章主要介紹了Spring boot + LayIM + t-io 實現文件上傳、 監(jiān)聽用戶狀態(tài)的實例代碼,需要的朋友可以參考下2017-12-12