Spring Batch遠(yuǎn)程分區(qū)的本地Jar包模式的代碼詳解
1 前言
Spring Batch
遠(yuǎn)程分區(qū)對(duì)于大量數(shù)據(jù)的處理非常擅長(zhǎng),它的實(shí)現(xiàn)有多種方式,如本地Jar包模式
、MQ模式
、Kubernetes模式
。這三種模式的如下:
(1)本地Jar包模式
:分區(qū)處理的worker
為一個(gè)Java進(jìn)程
,從jar
包啟動(dòng),通過(guò)jvm
參數(shù)和數(shù)據(jù)庫(kù)傳遞參數(shù);官方提供示例代碼。
(2)MQ模式
:worker
是一個(gè)常駐進(jìn)程,Manager
和Worker
通過(guò)消息隊(duì)列來(lái)傳遞參數(shù);網(wǎng)上有不少相關(guān)示例代碼。
(3)Kubernetes模式
:worker
為K8s
中的Pod
,Manager
直接啟動(dòng)Pod
來(lái)處理;網(wǎng)上并沒(méi)有找到任何示例代碼。
本文將通過(guò)代碼來(lái)講解第一種模式(本地Jar包模式
),其它后續(xù)再介紹。
建議先看下面文章了解一下:
Spring Batch入門:Spring Batch入門教程篇
Spring Batch并行處理介紹:詳解SpringBoot和SpringBatch 使用
2 代碼講解
本文代碼中,Manager
和Worker
是放在一起的,在同一個(gè)項(xiàng)目里,也只會(huì)打一個(gè)jar
包而已;我們通過(guò)profile
來(lái)區(qū)別是manager
還是worker
,也就是通過(guò)Spring Profile
實(shí)現(xiàn)一份代碼,兩份邏輯。實(shí)際上也可以拆成兩份代碼,但放一起更方便測(cè)試,而且代碼量不大,就沒(méi)有必要了。
2.1 項(xiàng)目準(zhǔn)備
2.1.1 數(shù)據(jù)庫(kù)
首先我們需要準(zhǔn)備一個(gè)數(shù)據(jù)庫(kù),因?yàn)?code>Manager和Worker
都需要同步狀態(tài)到DB
上,不能直接使用嵌入式的內(nèi)存數(shù)據(jù)庫(kù)了,需要一個(gè)外部可共同訪問(wèn)的數(shù)據(jù)庫(kù)。這里我使用的是H2 Database
,安裝可參考:把H2數(shù)據(jù)庫(kù)從jar包部署到Kubernetes,并解決Ingress不支持TCP的問(wèn)題。
2.1.2 引入依賴
maven
引入依賴如下所示:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-task</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-deployer-local</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-integration</artifactId> </dependency>
spring-cloud-deployer-local
用于部署和啟動(dòng)worker
,非常關(guān)鍵;其它就是Spring Batch
和Task
相關(guān)的依賴;以及數(shù)據(jù)庫(kù)連接。
2.1.3 主類入口
Springboot
的主類入口如下:
@EnableTask @SpringBootApplication @EnableBatchProcessing public class PkslowRemotePartitionJar { public static void main(String[] args) { SpringApplication.run(PkslowRemotePartitionJar.class, args); } }
在Springboot
的基礎(chǔ)上,添加了Spring Batch
和Spring Cloud Task
的支持。
2.2 關(guān)鍵代碼編寫
前面的數(shù)據(jù)庫(kù)搭建和其它代碼沒(méi)有太多可講的,接下來(lái)就開始關(guān)鍵代碼的編寫。
2.2.1 分區(qū)管理Partitioner
Partitioner
是遠(yuǎn)程分區(qū)中的核心bean
,它定義了分成多少個(gè)區(qū)、怎么分區(qū),要把什么變量傳遞給worker
。它會(huì)返回一組<分區(qū)名,執(zhí)行上下文>的鍵值對(duì),即返回Map<String, ExecutionContext>
。把要傳遞給worker
的變量放在ExecutionContext
中去,支持多種類型的變量,如String
、int
、long
等。實(shí)際上,我們不建議通過(guò)ExecutionContext
來(lái)傳遞太多數(shù)據(jù);可以傳遞一些標(biāo)識(shí)或主鍵,然后worker
自己去拿數(shù)據(jù)即可。
具體代碼如下:
private static final int GRID_SIZE = 4; @Bean public Partitioner partitioner() { return new Partitioner() { @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> partitions = new HashMap<>(gridSize); for (int i = 0; i < GRID_SIZE; i++) { ExecutionContext executionContext = new ExecutionContext(); executionContext.put("partitionNumber", i); partitions.put("partition" + i, executionContext); } return partitions; } }; }
上面分成4個(gè)區(qū),程序會(huì)啟動(dòng)4個(gè)worker
來(lái)處理;給worker
傳遞的參數(shù)是partitionNumber
。
2.2.2 分區(qū)處理器PartitionHandler
PartitionHandler
也是核心的bean
,它決定了怎么去啟動(dòng)worker
,給它們傳遞什么jvm
參數(shù)(跟之前的ExecutionContext
傳遞不一樣)。
@Bean public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception { Resource resource = this.resourceLoader.getResource(workerResource); DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository); List<String> commandLineArgs = new ArrayList<>(3); commandLineArgs.add("--spring.profiles.active=worker"); commandLineArgs.add("--spring.cloud.task.initialize-enabled=false"); commandLineArgs.add("--spring.batch.initializer.enabled=false"); partitionHandler .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs)); partitionHandler .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment)); partitionHandler.setMaxWorkers(2); partitionHandler.setApplicationName("PkslowWorkerJob"); return partitionHandler; }
上面代碼中:
resource
是worker
的jar
包地址,表示將啟動(dòng)該程序;
workerStep
是worker
將要執(zhí)行的step
;
commandLineArgs
定義了啟動(dòng)worker
的jvm
參數(shù),如--spring.profiles.active=worker
;
environment
是manager
的系統(tǒng)環(huán)境變量,可以傳遞給worker
,當(dāng)然也可以選擇不傳遞;
MaxWorkers
是最多能同時(shí)啟動(dòng)多少個(gè)worker
,類似于線程池大小;設(shè)置為2,表示最多同時(shí)有2個(gè)worker
來(lái)處理4個(gè)分區(qū)。
2.2.3 Manager和Worker的Batch定義
完成了分區(qū)相關(guān)的代碼,剩下的就只是如何定義Manager
和Worker
的業(yè)務(wù)代碼了。
Manager
作為管理者,不用太多業(yè)務(wù)邏輯,代碼如下:
@Bean @Profile("!worker") public Job partitionedJob(PartitionHandler partitionHandler) throws Exception { Random random = new Random(); return this.jobBuilderFactory.get("partitionedJob" + random.nextInt()) .start(step1(partitionHandler)) .build(); } @Bean public Step step1(PartitionHandler partitionHandler) throws Exception { return this.stepBuilderFactory.get("step1") .partitioner(workerStep().getName(), partitioner()) .step(workerStep()) .partitionHandler(partitionHandler) .build(); }
Worker
主要作用是處理數(shù)據(jù),是我們的業(yè)務(wù)代碼,這里就演示一下如何獲取Manager
傳遞過(guò)來(lái)的partitionNumber
:
@Bean public Step workerStep() { return this.stepBuilderFactory.get("workerStep") .tasklet(workerTasklet(null, null)) .build(); } @Bean @StepScope public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) { return new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { Thread.sleep(6000); //增加延時(shí),查看效果,通過(guò)jps:在jar情況下會(huì)新起java進(jìn)程 System.out.println("This tasklet ran partition: " + partitionNumber); return RepeatStatus.FINISHED; } }; }
通過(guò)表達(dá)式@Value("#{stepExecutionContext['partitionNumber']}")
獲取Manager
傳遞過(guò)來(lái)的變量;注意要加注解@StepScope
。
3 程序運(yùn)行
因?yàn)槲覀兎譃?code>Manager和Worker
,但都是同一份代碼,所以我們先打包一個(gè)jar
出來(lái),不然manager
無(wú)法啟動(dòng)。配置數(shù)據(jù)庫(kù)和Worker
的jar
包地址如下:
spring.datasource.url=jdbc:h2:tcp://localhost:9092/test spring.datasource.username=pkslow spring.datasource.password=pkslow spring.datasource.driver-class-name=org.h2.Driver pkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar
執(zhí)行程序如下:
可以看到啟動(dòng)了4次Java
程序,還給出日志路徑。
通過(guò)jps
命令查看,能看到一個(gè)Manager
進(jìn)程,還有兩個(gè)worker
進(jìn)程:
4 復(fù)雜變量傳遞
前面講了Manager
可以通過(guò)ExecutionContext
傳遞變量,如簡(jiǎn)單的String
、long
等。但其實(shí)它也是可以傳遞復(fù)雜的Java
對(duì)象的,但對(duì)應(yīng)的類需要可序列化,如:
import java.io.Serializable; public class Person implements Serializable { private Integer age; private String name; private String webSite; //getter and setter }
Manager
傳遞:
executionContext.put("person", new Person(0, "pkslow", "www.pkslow.com"));
Worker
接收:
@Value("#{stepExecutionContext['person']}") Person person
5 總結(jié)
本文介紹了Spring Batch
遠(yuǎn)程分區(qū)的本地Jar包模式
,只能在一臺(tái)機(jī)器上運(yùn)行,所以也是無(wú)法真正發(fā)揮出遠(yuǎn)程分區(qū)的作用。但它對(duì)我們后續(xù)理解更復(fù)雜的模式是有很大幫助的;同時(shí),我們也可以使用本地模式進(jìn)行開發(fā)測(cè)試,畢竟它只需要一個(gè)數(shù)據(jù)庫(kù)就行了,依賴很少。
- Linux 啟動(dòng)停止SpringBoot jar 程序部署Shell 腳本的方法
- 詳解spring boot 以jar的方式啟動(dòng)常用shell腳本
- springboot maven 項(xiàng)目打包jar 最后名稱自定義的教程
- IDEA 將 SpringBoot 項(xiàng)目打包成jar的方法
- 解決Spring Boot 多模塊注入訪問(wèn)不到j(luò)ar包中的Bean問(wèn)題
- SpringBoot打Jar包在命令行運(yùn)行流程詳解
- Java springboot項(xiàng)目jar發(fā)布過(guò)程解析
- Spring Shell打Jar包時(shí)常用小技巧
相關(guān)文章
tk-mybatis整合springBoot使用兩個(gè)數(shù)據(jù)源的方法
單純的使用mybaits進(jìn)行多數(shù)據(jù)配置網(wǎng)上資料很多,但是關(guān)于tk-mybaits多數(shù)據(jù)源配置沒(méi)有相關(guān)材料,本文就詳細(xì)的介紹一下如何使用,感興趣的可以了解一下2021-12-12Eureka源碼閱讀解析Server服務(wù)端啟動(dòng)流程實(shí)例
這篇文章主要為大家介紹了Eureka源碼閱讀解析Server服務(wù)端啟動(dòng)流程實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10