詳解如何配置Spring Batch批處理失敗重試機(jī)制
1. 引言
默認(rèn)情況下,Spring批處理作業(yè)在執(zhí)行過程中出現(xiàn)任何錯誤都會失敗。然而有些時候,為了提高應(yīng)用程序的彈性,我們就需要處理這類間歇性的故障。 在這篇短文中,我們就來一起探討 如何在Spring批處理框架中配置重試邏輯。
2. 簡單舉例
假設(shè)有一個批處理作業(yè),它讀取一個CSV文件作為輸入:
username, userid, transaction_date, transaction_amount
sammy, 1234, 31/10/2015, 10000
john, 9999, 3/12/2015, 12321
然后,它通過訪問REST端點來處理每條記錄,獲取用戶的 age 和 postCode 屬性:
public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> { @Override public Transaction process(Transaction transaction) throws IOException { log.info("RetryItemProcessor, attempting to process: {}", transaction); HttpResponse response = fetchMoreUserDetails(transaction.getUserId()); //parse user's age and postCode from response and update transaction ... return transaction; } ... }
最后,它生成并輸出一個合并的XML:
<transactionRecord> <transactionRecord> <amount>10000.0</amount> <transactionDate>2015-10-31 00:00:00</transactionDate> <userId>1234</userId> <username>sammy</username> <age>10</age> <postCode>430222</postCode> </transactionRecord> ... </transactionRecord>
3. ItemProcessor 中添加重試
現(xiàn)在假設(shè),如果到REST端點的連接由于某些網(wǎng)絡(luò)速度慢而超時,該怎么辦?如果發(fā)生這種情況,則我們的批處理工作將失敗。
在這種情況下,我們希望失敗的 item 處理重試幾次。因此,接下來我將批處理作業(yè)配置為:在出現(xiàn)故障時執(zhí)行最多三次重試:
@Bean public Step retryStep( ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) throws ParseException { return stepBuilderFactory .get("retryStep") .<Transaction, Transaction>chunk(10) .reader(itemReader(inputCsv)) .processor(processor) .writer(writer) .faultTolerant() .retryLimit(3) .retry(ConnectTimeoutException.class) .retry(DeadlockLoserDataAccessException.class) .build(); }
這里調(diào)用了 faultTolerant() 來啟用重試功能。另外,我們使用 retry 和 retryLimit 分別定義符合重試條件的異常和 item 的最大重試次數(shù)。
4. 測試重試次數(shù)
假設(shè)我們有一個測試場景,其中返回 age 和 postCode 的REST端點關(guān)閉了一段時間。在這個測試場景中,我們只對前兩個 API 調(diào)用獲取一個 ConnectTimeoutException ,而第三個調(diào)用將成功:
@Test public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception { FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT); FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT); when(httpResponse.getEntity()) .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }")); //fails for first two calls and passes third time onwards when(httpClient.execute(any())) .thenThrow(new ConnectTimeoutException("Timeout count 1")) .thenThrow(new ConnectTimeoutException("Timeout count 2")) .thenReturn(httpResponse); JobExecution jobExecution = jobLauncherTestUtils .launchJob(defaultJobParameters()); JobInstance actualJobInstance = jobExecution.getJobInstance(); ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED")); AssertFile.assertFileEquals(expectedResult, actualResult); }
在這里,我們的工作成功地完成了。另外,從日志中可以明顯看出 第一條記錄 id=1234 失敗了兩次,最后在第三次重試時成功了:
19:06:57.742 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms
同樣,看下另一個測試用例,當(dāng)所有重試次數(shù)都用完時會發(fā)生什么:
@Test public void whenEndpointAlwaysFail_thenJobFails() throws Exception { when(httpClient.execute(any())) .thenThrow(new ConnectTimeoutException("Endpoint is down")); JobExecution jobExecution = jobLauncherTestUtils .launchJob(defaultJobParameters()); JobInstance actualJobInstance = jobExecution.getJobInstance(); ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); assertThat(actualJobExitStatus.getExitCode(), is("FAILED")); assertThat(actualJobExitStatus.getExitDescription(), containsString("org.apache.http.conn.ConnectTimeoutException")); }
在這個測試用例中,在作業(yè)因 ConnectTimeoutException
而失敗之前,會嘗試對第一條記錄重試三次。
5. 使用XML配置重試
最后,讓我們看一下與上述配置等價的XML:
<batch:job id="retryBatchJob"> <batch:step id="retryStep"> <batch:tasklet> <batch:chunk reader="itemReader" writer="itemWriter" processor="retryItemProcessor" commit-interval="10" retry-limit="3"> <batch:retryable-exception-classes> <batch:include class="org.apache.http.conn.ConnectTimeoutException"/> <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/> </batch:retryable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>
6. 簡單總結(jié)
在本文中,我們學(xué)習(xí)了如何在Spring批處理中配置重試邏輯,其中包括使用Java和XML配置。以及使用單元測試來觀察重試在實踐中是如何工作的。
到此這篇關(guān)于詳解如何配置Spring Batch批處理失敗重試機(jī)制的文章就介紹到這了,更多相關(guān)Spring Batch批處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中@ConditionalOnProperty的使用及作用詳解
這篇文章主要介紹了SpringBoot中@ConditionalOnProperty的使用及作用詳解,@ConditionalOnProperty通過讀取本地配置文件中的值來判斷 某些 Bean 或者 配置類 是否加入spring 中,需要的朋友可以參考下2024-01-01spring boot使用sonarqube來檢查技術(shù)債務(wù)
今天小編就為大家分享一篇關(guān)于spring boot使用sonarqube來檢查技術(shù)債務(wù),小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12java:無法訪問org.springframework.boot.SpringApplication
本文主要介紹了java:無法訪問org.springframework.boot.SpringApplication,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-03-03Spring的Bean注入解析結(jié)果BeanDefinition詳解
這篇文章主要介紹了Spring的Bean注入解析結(jié)果BeanDefinition詳解,BeanDefinition描述了一個bean實例,擁有屬性值、構(gòu)造參數(shù)值和具體實現(xiàn)的其他信息,其是一個bean的元數(shù)據(jù),xml中配置的bean元素會被解析成BeanDefinition對象,需要的朋友可以參考下2023-12-12SpringBoot3整合郵件服務(wù)實現(xiàn)郵件發(fā)送功能
本文介紹了spring boot整合email服務(wù),實現(xiàn)發(fā)送驗證碼,郵件(普通文本郵件、靜態(tài)資源郵件、附件郵件),文中通過代碼示例介紹的非常詳細(xì),堅持看完相信對你有幫助,需要的朋友可以參考下2024-05-05