一、开发效率高
Spring Batch提高了开发者的生产力,使得批处理应用程序的开发变得容易和高效。它提供了一组用于定义简单批处理作业的元数据,这些元数据可以轻松地扩展,以支持更复杂的流程。在Spring Batch中,开发者只需要定义作业的输入(Reader)、处理(Processor)和输出(Writer)节点即可,而不需要像原始的Java批处理API那样编写很多模板和模式。另外,Spring Batch 还提供了大量的工具,如JobLauncher和JobOperator ,使得作业的启动和管理变得十分简单。
以下是一个简单的示例,展示如何使用Spring Batch来实现在XML或者数据库中读取人员信息,处理并输出到XML和文本文件当中。代码中包含两个不同的Job,分别读取XML和数据库中的数据并输出到不同的地方。
@Configuration @EnableBatchProcessing public class BatchConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Autowired public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { this.jobBuilderFactory = jobBuilderFactory; this.stepBuilderFactory = stepBuilderFactory; } @Bean public ItemReaderxmlPersonReader() { StaxEventItemReader reader = new StaxEventItemReader<>(); reader.setResource(new ClassPathResource("persons.xml")); reader.setFragmentRootElementName("person"); reader.setUnmarshaller(personUnmarshaller()); return reader; } @Bean public Unmarshaller personUnmarshaller() { Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); marshaller.setClassesToBeBound(Person.class); return marshaller; } @Bean public ItemReader dbPersonReader(JdbcTemplate jdbcTemplate) { JdbcCursorItemReader reader = new JdbcCursorItemReader<>(); reader.setDataSource(jdbcTemplate.getDataSource()); reader.setSql("select id, first_name, last_name, email, age from person"); reader.setRowMapper((resultSet, i) -> { Person person = new Person(); person.setId(resultSet.getLong("id")); person.setFirstName(resultSet.getString("first_name")); person.setLastName(resultSet.getString("last_name")); person.setEmail(resultSet.getString("email")); person.setAge(resultSet.getInt("age")); return person; }); return reader; } @Bean public ItemWriter xmlPersonWriter() { StaxEventItemWriter writer = new StaxEventItemWriter<>(); writer.setResource(new FileSystemResource("persons.xml")); writer.setRootTagName("persons"); writer.setMarshaller(personMarshaller()); writer.setOverwriteOutput(true); return writer; } @Bean public Marshaller personMarshaller() { Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); marshaller.setClassesToBeBound(Person.class); return marshaller; } @Bean public ItemWriter txtPersonWriter() throws Exception { FlatFileItemWriter writer = new FlatFileItemWriter<>(); writer.setResource(new FileSystemResource("persons.txt")); writer.setLineAggregator(lineAggregator()); writer.setAppendAllowed(true); writer.setHeaderCallback(writer1 -> { writer1.write("Id\tFirst Name\tLast Name\tEmail\tAge\n"); }); writer.afterPropertiesSet(); return writer; } @Bean public LineAggregator lineAggregator() { DelimitedLineAggregator aggregator = new DelimitedLineAggregator<>(); aggregator.setDelimiter("\t"); aggregator.setFieldExtractor(fieldExtractor()); return aggregator; } @Bean public FieldExtractor fieldExtractor() { BeanWrapperFieldExtractor extractor = new BeanWrapperFieldExtractor<>(); extractor.setNames(new String[] {"id", "firstName", "lastName", "email", "age"}); return extractor; } @Bean public Step xmlPersonStep(ItemReader xmlPersonReader, ItemWriter txtPersonWriter, ItemWriter xmlPersonWriter) { return stepBuilderFactory.get("xmlPersonReadingStep") . chunk(10) .reader(xmlPersonReader) .processor(personUpperProcessor()) .writer(txtPersonWriter) .writer(xmlPersonWriter) .build(); } @Bean public Step dbPersonStep(ItemReader dbPersonReader, ItemWriter txtPersonWriter) { return stepBuilderFactory.get("dbPersonReadingStep") . chunk(10) .reader(dbPersonReader) .processor(personUpperProcessor()) .writer(txtPersonWriter) .build(); } @Bean public Job job(Step dbPersonStep, Step xmlPersonStep) { return jobBuilderFactory.get("personJob") .incrementer(new RunIdIncrementer()) .start(xmlPersonStep) .next(dbPersonStep) .build(); } @Bean public ItemProcessor personUpperProcessor() { return person -> { person.setFirstName(person.getFirstName().toUpperCase()); person.setLastName(person.getLastName().toUpperCase()); person.setEmail(person.getEmail().toUpperCase()); return person; }; } }
二、高性能处理大规模数据
Spring Batch 能够高效处理大规模的数据,其内部实现了优化批处理的算法,可以大大提升批处理的处理速度、效率和吞吐量。在处理大规模数据的过程中,Spring Batch 的每个步骤都是按照一定的要求进行分割的,以便能够并行处理数据,这样才能最大化地利用多核处理器的性能优势。
以下代码展示了如何在Spring Batch中使用多线程并行处理大文件:
@Configuration @EnableBatchProcessing public class BatchConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Autowired public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { this.jobBuilderFactory = jobBuilderFactory; this.stepBuilderFactory = stepBuilderFactory; } @Bean public FlatFileItemReaderreader() { FlatFileItemReader reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("data.txt")); reader.setLineMapper(lineMapper()); return reader; } @Bean public LineMapper lineMapper() { DefaultLineMapper lineMapper = new DefaultLineMapper<>(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setDelimiter(","); tokenizer.setNames(new String[] {"id", "filename", "content"}); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new FileContentFieldSetMapper()); return lineMapper; } @Bean public ItemProcessor processor() { return fileContent -> { fileContent.setContent(fileContent.getContent().toUpperCase()); return fileContent; }; } @Bean public ItemWriter writer() { FlatFileItemWriter writer = new FlatFileItemWriter<>(); writer.setResource(new FileSystemResource("processed-data.txt")); writer.setLineAggregator(new DelimitedLineAggregator () { { setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor () { { setNames(new String[] {"id", "filename", "content"}); } }); } }); return writer; } @Bean public Step step1() { return stepBuilderFactory.get("step1") . chunk(1000) .reader(reader()) .processor(processor()) .writer(writer()) .taskExecutor(taskExecutor()) .build(); } @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(20); taskExecutor.afterPropertiesSet(); return taskExecutor; } @Bean public Job job() { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } }
三、易于维护和测试
Spring Batch提供了许多易于维护和测试的功能。Spring Batch的作业和步骤都是单独的Java类,这使得它们易于定义、修改和测试。此外,Spring Batch 还提供了一系列的工具和类,如 ItemReader、ItemWriter、ItemProcessor 等,方便开发者进行单元测试和集成测试。
以下代码展示如何在Spring Batch的项处理器中使用JUnit进行单元测试:
public class PersonItemProcessorTest { private ItemProcessorprocessor = new PersonItemProcessor(); @Test public void testProcessor() throws Exception { Person person = new Person(); person.setFirstName("Alice"); person.setLastName("Smith"); person.setEmail("alice.smith@example.com"); person.setAge(30); PersonDto personDto = processor.process(person); assertNotNull(personDto); assertEquals("ALICE SMITH", personDto.getName()); assertEquals("alice.smith@example.com", personDto.getEmail()); } }
四、缺点
Spring Batch的主要缺点是其学习曲线较长,需要一定的时间去学习和了解其核心概念及其用法。另外,一些高级功能,如调度和监控等,需要进行一定的配置和设置,需要开发者有一定的经验才能轻松理解和使用。