一、Batch是什么?
Spring Batch是一款成熟稳定的批处理框架。它利用Java实现了大型、复杂批处理作业处理机制。它允许开发人员构建批处理应用程序,从而更加容易地执行、过滤、排序和转换大量数据。Batch不仅适合大数据处理,同时也可以在调度及监控方面提供帮助。Spring Batch支持大多数的主流数据库,同时它还提供了强大的与spring生态系统集成的能力。
以下是简单的代码示例:
public class BatchConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job simpleJob() { return jobBuilderFactory.get("simpleJob") .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") .tasklet((contribution, chunkContext) -> { System.out.println("Hello, world!"); return RepeatStatus.FINISHED; }).build(); } }
二、Batch核心概念
1. Job
在Spring Batch中,A Job是任务的顶级容器。它由一组Steps组成,定义了一个完整的、可独立执行的工作流程。
2. Step
在Spring Batch中,Step代表Job执行过程中的单个步骤。Step包含了一组处理步骤和决策,每个步骤中可以有一个Tasklet或一个Chunk。
3. Tasklet
Tasklet是一个独立的单元,可以执行一个事务。通常情况下,Tasklet可以完成一些批处理初始化工作、数据清理、文件复制或发送邮件等独立任务。
4. Chunk
Chunk指的是一次批处理中每次读取的一组数据,如从数据库中读取一批数据,然后逐一进行处理操作,处理完了再进行下一次读取操作。处理完成后,事务提交或回滚。Chunk事务通常适用于处理大规模数据时,可以减少内存的使用。
5. ItemReader
ItemReader负责从数据源中读取数据,并且将数据传递给ItemProcessor进行处理。
6. ItemProcessor
ItemProcessor用来处理读取到的数据,可以根据需要进行数据转换或过滤等操作。
7. ItemWriter
ItemWriter用来将处理后的数据写入目标数据源中。
8. JobRepository
JobRepository是一个持久化的、线程安全的Repository,它用来存储Job的状态信息,以及每个Step的执行状态信息。
三、使用Spring Batch实现批处理作业
1. 实现步骤
使用Spring Batch实现批处理作业的步骤如下:
1) 创建Job
在创建Job之前,需要先定义JobRepository和JobBuilderFactory。然后通过JobBuilderFactory创建Job。
@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private JobRepository jobRepository; @Bean public Job job() { return jobBuilderFactory.get("job") .repository(jobRepository) .start(step()) .build(); }
2) 创建Step
在创建Step之前,需要先定义StepBuilderFactory。然后通过StepBuilderFactory创建Step。
@Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step step() { return stepBuilderFactory.get("step") .chunk(5) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
3) 创建ItemReader
在创建ItemReader之前,需要先定义DataSource,并使用JdbcCursorItemReader从数据源中获取数据。
@Autowired private DataSource dataSource; @Bean public ItemReaderreader() { JdbcCursorItemReader reader = new JdbcCursorItemReader<>(); reader.setDataSource(dataSource); reader.setSql("select name from person"); reader.setRowMapper((rs, rowNum) -> rs.getString("name")); return reader; }
4) 创建ItemProcessor
在创建ItemProcessor之前,需要定义ItemProcessor并实现process()方法。
@Bean public ItemProcessorprocessor() { return name -> "Hello, " + name + "!"; }
5) 创建ItemWriter
在创建ItemWriter之前,需要定义写入目标数据源。
@Autowired private SomeRepository someRepository; @Bean public ItemWriterwriter() { return items -> someRepository.saveAll(items); }
6) 运行Job
在创建完Job之后,可以直接运行Job。
@Autowired private JobLauncher jobLauncher; @Autowired private Job job; public void run() throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addString("jobId", String.valueOf(System.currentTimeMillis())) .toJobParameters(); jobLauncher.run(job, jobParameters); }
2. 执行结果
执行上述代码后,将会输出以下结果:
Hello, Tom! Hello, Jerry! Hello, Mary! Hello, John! Hello, Bob!
四、总结
Spring Batch是一个成熟稳定的批处理框架,它由Job、Step、Tasklet、Chunk、ItemReader、ItemProcessor、ItemWriter和JobRepository等核心概念组成。使用Spring Batch可以帮助我们更加容易地执行、过滤、排序和转换大量数据,并且它还提供了强大的与spring生态系统集成的能力。