本篇文章主要内容:通过Spring Batch从一个库中读取数据进过处理后写入到另外一个库中。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId></dependency>
配置Job启动器
@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) { SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ; jobLauncher.setJobRepository(userJobRepository) ; return jobLauncher ;}
配置任务Repository存储元信息
@BeanJobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ; factory.setDatabaseType("mysql") ; factory.setTransactionManager(transactionManager) ; factory.setDataSource(dataSource) ; try { factory.afterPropertiesSet() ; return factory.getObject() ; } catch (Exception e) { throw new RuntimeException(e) ; }}
配置ItemReader读取器
@BeanItemReader<User> userReader(JobOperator jobOperator) throws Exception { JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ; builder.entityManagerFactory(entityManagerFactory) ; // 每次分页查询多少条数据 builder.pageSize(10) ; builder.queryString("select u from User u where u.uid <= 50") ; builder.saveState(true) ; builder.name("userReader") ; return builder.build() ;}
配置数据源,该数据源是用来写入操作的
public DataSource dataSource() { HikariDataSource dataSource = new HikariDataSource() ; dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ; dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ; dataSource.setUsername("root") ; dataSource.setPassword("xxxooo") ; return dataSource ;}
配置ItemWriter用来写入操作(当前库的数据写入到另外一个库,上面的数据源)
@BeanItemWriter<User> userWriter() { // 通过JDBC批量处理 JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ; DataSource dataSource = dataSource() ; builder.dataSource(dataSource) ; builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ; builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() { @Override public void setValues(User item, PreparedStatement ps) throws SQLException { ps.setInt(1, item.getUid()) ; ps.setString(2, item.getName()) ; ps.setString(3, item.getSex()) ; ps.setString(4, item.getMobile()) ; ps.setInt(5, item.getAge()) ; ps.setObject(6, item.getBirthday()) ; } }) ; return builder.build() ;}
配置ItemProcessor处理器,数据从当前库读取处理后经过处理后再写入另外的库中
@BeanItemProcessor<User, User> userProcessor() { return new ItemProcessor<User, User>() { @Override public User process(User item) throws Exception { System.out.printf("%s - 开始处理数据:%s%n", Thread.currentThread().getName(), item.toString()) ; // 模拟耗时操作 TimeUnit.SECONDS.sleep(1) ; // 在这里你可以对数据进行相应的处理。 return item ; } } ;}
配置Step将ItemReader、ItemProcessor、ItemWriter串联在一起。
@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) { return steps.get("userStep1") .<User, User>chunk(5) .reader(userReader) .processor(userProcessor) .writer(userWriter) .build() ;}
配置Job,Job是封装整个批处理流程的实体。在 Spring Batch 中,Job只是Step实例的容器。它将逻辑上属于一个流程的多个步骤组合在一起,并允许对所有步骤的全局属性(如可重启性)进行配置。作业配置包含:
@BeanJob userJob(Step userStep1, Step userStep2) { return jobs.get("userJob").start(userStep1).build();}
以上是Spring Batch定义配置一个Job所需的核心组件。接下来会以上面的基础配置进行高阶知识点进行介绍。
@RequestMapping("/userJob")public class UserJobController { @Resource private JobLauncher userJobLauncher ; @GetMapping("/start") public Object start() throws Exception { JobParameters jobParameters = new JobParameters() ; this.userJobLauncher.run(userJob, jobParameters) ; return "started" ; }}
通过JobLauncher#run方法启动Job。当你调用该接口时,你会发现接口一直不会返回,一直阻塞,下图是Job的启动序列
图片
根据上图能知道,当你调用run方法后,会等待整个Job退出状态为FINISHED或者FAILED后才能结束。所以,你需要异步完成,以便 SimpleJobLauncher 立即返回给调用者。而正确的序列应该是如下:
图片
上图通过异步方式启动Job序列。
@BeanTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ; taskExecutor.setThreadNamePrefix("spring_batch_launcher") ; taskExecutor.setCorePoolSize(10) ; taskExecutor.setMaxPoolSize(10) ; taskExecutor.initialize() ; return taskExecutor ;}@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) { SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ; jobLauncher.setJobRepository(userJobRepository) ; jobLauncher.setTaskExecutor(taskExecutor()) ; return jobLauncher ;}
通过上面配置后,Job启动将是异步的会直接返回JobExecution。
当一个Job正在执行,由于断电或者强制终止了程序。当程序恢复后你希望能够接着程序终止前的进度继续执行,这时候你需要进行如下的操作(本人没有发现有什么API能够操作的,可能文档没看仔细)。
当程序非正常终止是,下面两张表的状态都是STARTED,END_TIME为null
batch_job_execution表
图片
batch_step_execution表
图片
想要重新启动必须将上面的状态修改为STOPPED,END_TIME字段设置上值(是什么值无所谓)。
然后我们就可以继续使用上面的Controller接口启动任务继续执行了。
为了加快程序的执行,我们可以为Step配置线程池
@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) { return steps.get("userStep1") .<User, User>chunk(5) .reader(userReader) .processor(userProcessor) .writer(userWriter) // 配置线程池 .taskExecutor(taskExecutor()) .build() ;}
注意:Step中使用的任何池化资源(如数据源)都可能对并发性设置限制。请确保这些资源池至少与步骤中所需的并发线程数一样大。
通过上面配置线程池后,你将在控制台看到如下输出。
图片
默认将有4个线程同时进行处理。可以通过如下配置进行调整
@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) { return steps.get("userStep1") // ... // 节流限制10,这里配置的大小应该与你的数据库连接池大小及使用的线程池核心线程数一致。 .throttleLimit(10) .build() ;}
要想重复启动Job,我们可以在启动Job时设置不同的JobParameters参数,只要参数不同那么就可以重复的启动Job。如下示例:
@GetMapping("/start/{page}")public Object start(@PathVariable("page") Long page) throws Exception { Map<String, JobParameter> parameters = new HashMap<>() ; // 每次设置的参数值不同即可。 parameters.put("page", new JobParameter(page)) ; JobParameters jobParameters = new JobParameters(parameters) ; this.userJobLauncher.run(userJob, jobParameters) ; return "started" ;}
以上是本篇文章的全部内容,希望对你有帮助。
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-87012-0.htmlSpringBatch高阶应用:大数据批处理框架实战指南
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: 架构设计中如何应对接口级故障?