Spring Batch 如何自定義ItemReader
Spring Batch 自定義ItemReader
Spring Batch支持各種數(shù)據(jù)輸入源,如文件、數(shù)據(jù)庫等。然而有時也會遇到一些默認不支持的數(shù)據(jù)源,這時我們則需要實現(xiàn)自己的數(shù)據(jù)源————自定義ItemReader。本文通過示例說明如何自定義ItemReader。
創(chuàng)建自定義ItemReader
創(chuàng)建自定義ItemReader需要下面兩個步驟:
- 創(chuàng)建一個實現(xiàn)ItemReader接口的類,并提供返回對象類型 T 作為類型參數(shù)。
- 按照下面規(guī)則實現(xiàn)ItemReader接口的T read()方法
read()方法如果存在下一個對象則返回,否則返回null。
下面我們自定義ItemReader,其返回在線測試課程的學(xué)生信息StuDto類型,為了減少復(fù)雜性,該數(shù)據(jù)存儲在內(nèi)存中。StuDto類是一個簡單數(shù)據(jù)傳輸對象,代碼如下:
@Data public class StuDTO { private String emailAddress; private String name; private String purchasedPackage; }
下面參照一下步驟創(chuàng)建ItemReader:
- 創(chuàng)建InMemoryStudentReader 類
- 實現(xiàn)ItemReader接口,并設(shè)置返回對象類型為StuDto
- 類中增加List studentData 字段,其包括參加課程的學(xué)生信息
- 類中增加nextStudentIndex 字段,表示下一個StuDto對象的索引
- 增加私有initialize()方法,初始化學(xué)生信息并設(shè)置索引值為0
- 創(chuàng)建構(gòu)造函數(shù)并調(diào)用initialize方法
- 實現(xiàn)read()方法,包括下面規(guī)則:如果存在下一個學(xué)生,則返回StuDto對象并把索引加一。否則返回null。
InMemoryStudentReader 代碼如下:
public class InMemoryStudentReader implements ItemReader<StuDto> { private int nextStudentIndex; private List<StuDto> studentData; InMemoryStudentReader() { initialize(); } private void initialize() { StuDto tony = new StuDto(); tony.setEmailAddress("tony.tester@gmail.com"); tony.setName("Tony Tester"); tony.setPurchasedPackage("master"); StuDto nick = new StuDto(); nick.setEmailAddress("nick.newbie@gmail.com"); nick.setName("Nick Newbie"); nick.setPurchasedPackage("starter"); StuDto ian = new StuDto(); ian.setEmailAddress("ian.intermediate@gmail.com"); ian.setName("Ian Intermediate"); ian.setPurchasedPackage("intermediate"); studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian)); nextStudentIndex = 0; } @Override public StuDto read() throws Exception { StuDto nextStudent = null; if (nextStudentIndex < studentData.size()) { nextStudent = studentData.get(nextStudentIndex); nextStudentIndex++; } return nextStudent; } }
創(chuàng)建好自定義ItemReader后,需要配置其作為bean讓Spring Batch Job使用。下面請看如何配置。
配置ItemReader Bean
配置類代碼如下:
@Configuration public class InMemoryStudentJobConfig { @Bean ItemReader<StuDto> inMemoryStudentReader() { return new InMemoryStudentReader(); } }
需要增加@Configuration表明類為配置類, 增加方法返回ItemReader類型,并增加@Bean注解,實現(xiàn)方法內(nèi)容————返回InMemoryStudentReader對象。
小結(jié)一下
本文通過示例說明如何自定義ItemReader,主要包括三個方面:
- 自定義ItemReader需實現(xiàn)ItemReader接口
- 實現(xiàn)ItemReader接口,需要指定返回類型作為類型參數(shù)(T)
- 實現(xiàn)接口方法read,如果存在下一個對象則返回,反之返回null
Spring Batch 之 ItemReader
重點介紹 ItemReader,如何從不同數(shù)據(jù)源讀取數(shù)據(jù);以及異常處理及重啟機制。
JdbcPagingItemReader
從數(shù)據(jù)庫中讀取數(shù)據(jù)
@Configuration public class DBJdbcDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("dbJdbcDemoWriter") private ItemWriter<? super Customer> dbJdbcDemoWriter; @Autowired private DataSource dataSource; @Bean public Job DBJdbcDemoJob(){ return jobBuilderFactory.get("DBJdbcDemoJob") .start(dbJdbcDemoStep()) .build(); } @Bean public Step dbJdbcDemoStep() { return stepBuilderFactory.get("dbJdbcDemoStep") .<Customer,Customer>chunk(100) .reader(dbJdbcDemoReader()) .writer(dbJdbcDemoWriter) .build(); } @Bean @StepScope public JdbcPagingItemReader<Customer> dbJdbcDemoReader() { JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); reader.setFetchSize(100); //批量讀取 reader.setRowMapper((rs,rowNum)->{ return Customer.builder().id(rs.getLong("id")) .firstName(rs.getString("firstName")) .lastName(rs.getString("lastName")) .birthdate(rs.getString("birthdate")) .build(); }); MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id, firstName, lastName, birthdate"); queryProvider.setFromClause("from Customer"); Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("id", Order.ASCENDING); queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider); return reader; } }
Job 和 ItermWriter不是本文介紹重點,此處舉例,下面例子相同
@Component("dbJdbcDemoWriter") public class DbJdbcDemoWriter implements ItemWriter<Customer> { @Override public void write(List<? extends Customer> items) throws Exception { for (Customer customer:items) System.out.println(customer); } }
FlatFileItemReader
從CVS文件中讀取數(shù)據(jù)
@Configuration public class FlatFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Bean public Job flatFileDemoJob(){ return jobBuilderFactory.get("flatFileDemoJob") .start(flatFileDemoStep()) .build(); } @Bean public Step flatFileDemoStep() { return stepBuilderFactory.get("flatFileDemoStep") .<Customer,Customer>chunk(100) .reader(flatFileDemoReader()) .writer(flatFileDemoWriter) .build(); } @Bean @StepScope public FlatFileItemReader<Customer> flatFileDemoReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("customer.csv")); reader.setLinesToSkip(1); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
StaxEventItemReader
從XML文件中讀取數(shù)據(jù)
@Configuration public class XmlFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("xmlFileDemoWriter") private ItemWriter<? super Customer> xmlFileDemoWriter; @Bean public Job xmlFileDemoJob(){ return jobBuilderFactory.get("xmlFileDemoJob") .start(xmlFileDemoStep()) .build(); } @Bean public Step xmlFileDemoStep() { return stepBuilderFactory.get("xmlFileDemoStep") .<Customer,Customer>chunk(10) .reader(xmlFileDemoReader()) .writer(xmlFileDemoWriter) .build(); } @Bean @StepScope public StaxEventItemReader<Customer> xmlFileDemoReader() { StaxEventItemReader<Customer> reader = new StaxEventItemReader<>(); reader.setResource(new ClassPathResource("customer.xml")); reader.setFragmentRootElementName("customer"); XStreamMarshaller unMarshaller = new XStreamMarshaller(); Map<String,Class> map = new HashMap<>(); map.put("customer",Customer.class); unMarshaller.setAliases(map); reader.setUnmarshaller(unMarshaller); return reader; } }
MultiResourceItemReader
從多個文件讀取數(shù)據(jù)
@Configuration public class MultipleFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Value("classpath*:/file*.csv") private Resource[] inputFiles; @Bean public Job multipleFileDemoJob(){ return jobBuilderFactory.get("multipleFileDemoJob") .start(multipleFileDemoStep()) .build(); } @Bean public Step multipleFileDemoStep() { return stepBuilderFactory.get("multipleFileDemoStep") .<Customer,Customer>chunk(50) .reader(multipleResourceItemReader()) .writer(flatFileDemoWriter) .build(); } private MultiResourceItemReader<Customer> multipleResourceItemReader() { MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>(); reader.setDelegate(flatFileReader()); reader.setResources(inputFiles); return reader; } @Bean public FlatFileItemReader<Customer> flatFileReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("customer.csv")); // reader.setLinesToSkip(1); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
異常處理及重啟機制
對于chunk-oriented step,Spring Batch提供了管理狀態(tài)的工具。如何在一個步驟中管理狀態(tài)是通過ItemStream接口為開發(fā)人員提供訪問權(quán)限保持狀態(tài)的組件。這里提到的這個組件是ExecutionContext實際上它是鍵值對的映射。map存儲特定步驟的狀態(tài)。該ExecutionContext使重啟步驟成為可能,因為狀態(tài)在JobRepository中持久存在。
執(zhí)行期間出現(xiàn)錯誤時,最后一個狀態(tài)將更新為JobRepository。下次作業(yè)運行時,最后一個狀態(tài)將用于填充ExecutionContext然后
可以繼續(xù)從上次離開的地方開始運行。
檢查ItemStream接口:
將在步驟開始時調(diào)用open()并執(zhí)行ExecutionContext;
用DB填充值; update()將在每個步驟或事務(wù)結(jié)束時調(diào)用,更新ExecutionContext;
完成所有數(shù)據(jù)塊后調(diào)用close();
下面我們構(gòu)造個例子
準備個cvs文件,在第33條數(shù)據(jù),添加一條錯誤名字信息 ;當讀取到這條數(shù)據(jù)時,拋出異常終止程序。
ItemReader測試代碼
@Component("restartDemoReader") public class RestartDemoReader implements ItemStreamReader<Customer> { private Long curLine = 0L; private boolean restart = false; private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); private ExecutionContext executionContext; RestartDemoReader public () { reader.setResource(new ClassPathResource("restartDemo.csv")); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); } @Override public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { Customer customer = null; this.curLine++; //如果是重啟,則從上一步讀取的行數(shù)繼續(xù)往下執(zhí)行 if (restart) { reader.setLinesToSkip(this.curLine.intValue()-1); restart = false; System.out.println("Start reading from line: " + this.curLine); } reader.open(this.executionContext); customer = reader.read(); //當匹配到wrongName時,顯示拋出異常,終止程序 if (customer != null) { if (customer.getFirstName().equals("wrongName")) throw new RuntimeException("Something wrong. Customer id: " + customer.getId()); } else { curLine--; } return customer; } /** * 判斷是否是重啟job * @param executionContext * @throws ItemStreamException */ @Override public void open(ExecutionContext executionContext) throws ItemStreamException { this.executionContext = executionContext; if (executionContext.containsKey("curLine")) { this.curLine = executionContext.getLong("curLine"); this.restart = true; } else { this.curLine = 0L; executionContext.put("curLine", this.curLine.intValue()); } } @Override public void update(ExecutionContext executionContext) throws ItemStreamException { System.out.println("update curLine: " + this.curLine); executionContext.put("curLine", this.curLine); } @Override public void close() throws ItemStreamException { } }
Job配置
以10條記錄為一個批次,進行讀取
@Configuration public class RestartDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Autowired @Qualifier("restartDemoReader") private ItemReader<Customer> restartDemoReader; @Bean public Job restartDemoJob(){ return jobBuilderFactory.get("restartDemoJob") .start(restartDemoStep()) .build(); } @Bean public Step restartDemoStep() { return stepBuilderFactory.get("restartDemoStep") .<Customer,Customer>chunk(10) .reader(restartDemoReader) .writer(flatFileDemoWriter) .build(); } }
當我們第一次執(zhí)行時,程序在33行拋出異常異常,curline值是30;
這時,我們可以查詢數(shù)據(jù)庫 batch_step_excution表,發(fā)現(xiàn)curline值已經(jīng)以 鍵值對形式,持久化進數(shù)據(jù)庫(上文以10條數(shù)據(jù)為一個批次;故33條數(shù)據(jù)異常時,curline值為30)
接下來,我們更新wrongName,再次執(zhí)行程序;
程序會執(zhí)行open方法,判斷數(shù)據(jù)庫step中map是否存在curline,如果存在,則是重跑,即讀取curline,從該批次開始往下繼續(xù)執(zhí)行;
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot集成SSM、Dubbo、Redis、JSP的案例小結(jié)及思路講解
這個案例其實就是SpringBoot集成SSM、Dubbo、Redis、JSP,看起來感覺很繁瑣,其實就是很簡單,下面通過案例分析給大家講解,感興趣的朋友跟隨小編一起看看吧2021-05-05IDEA mybatis-generator逆向工程生成代碼
這篇文章主要介紹了IDEA mybatis-generator逆向工程生成代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-06-06jdk-logging?log4j?logback日志系統(tǒng)實現(xiàn)機制原理介紹
這篇文章主要介紹了jdk-logging、log4j、logback日志介紹以及三個日志系統(tǒng)的實現(xiàn)機制,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03spring boot實戰(zhàn)之本地jar包引用示例
本篇文章主要介紹了spring boot實戰(zhàn)之本地jar包引用示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-10-10Spring boot攔截器實現(xiàn)IP黑名單的完整步驟
這篇文章主要給大家介紹了關(guān)于Spring boot攔截器實現(xiàn)IP黑名單的完整步驟,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用Spring boot攔截器具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06springboot中自定義異常以及定制異常界面實現(xiàn)過程解析
這篇文章主要介紹了springboot中自定義異常以及定制異常界面實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-09-09