Spring Boot + Spring Batch + Spring JPA
Posted
技术标签:
【中文标题】Spring Boot + Spring Batch + Spring JPA【英文标题】: 【发布时间】:2018-07-10 22:33:18 【问题描述】:我正在从事将数据从 Sql Server 移动到 Cassandra 的 Spring Batch 作业。我正在使用 Spring Data JPA 来读取和写入数据。我为这两个数据库创建了实体和 JPA 存储库。
现在我不知道如何将我的 JpaRepostorty 与 Spring Batch ItemReader 一起使用。我在互联网上搜索过,发现他们提到使用 JpaPageItemReader 的参考文献很少。但这需要指定查询和配置其他详细信息。但我无法弄清楚如何使用我拥有的现有 JpaRepository。下面是相关代码的sn-p-
我的 SQL Server 的 JpaRepostory -
public interface ScanJpaRepository extends JpaRepository<Scan, Integer>
@Transactional(readOnly = true)
@Query("select s from Scan s left join fetch s.projectVersion")
Stream<Scan> findAllScan();
我的 Spring Batch 作业 -
@Configuration
@EnableBatchProcessing
public class SSCBatchConfigurationCassandra
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public PlatformTransactionManager transactionManager()
return new ResourcelessTransactionManager();
@Bean
public JobExplorer jobExplorer() throws Exception
MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(mapJobRepositoryFactoryBean());
jobExplorerFactory.afterPropertiesSet();
return jobExplorerFactory.getObject();
@Bean
public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean()
MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
mapJobRepositoryFactoryBean.setTransactionManager(transactionManager());
return mapJobRepositoryFactoryBean;
@Bean
public JobRepository jobRepository() throws Exception
return mapJobRepositoryFactoryBean().getObject();
@Bean
public JobLauncher jobLauncher() throws Exception
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository());
return simpleJobLauncher;
@Bean
public ItemReader<Project> reader()
**// how to read from ScanJpaRepository ??**
@Bean
public CassandraItemProcessor processor()
return new CassandraItemProcessor();
@Bean
public ItemWriter<CassandraProject> cqlWriter()
final CassandraBatchItemWriter writer = new CassandraBatchItemWriter();
return writer;
// tag::jobstep[]
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1)
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
@Bean
public Step step1()
return stepBuilderFactory.get("step1")
.<Project, CassandraProject> chunk(100)
.reader(reader())
.processor(processor())
.writer()
.build();
// end::jobstep[]
更新#1: 如前所述,我将项目阅读器添加到我的批处理作业中。
@Configuration
@EnableBatchProcessing
public class FortifySSCBatchConfigurationCassandra
....
@Autowired
public ScanItemReader itemReader;
.....
@Bean
public Step step1()
return stepBuilderFactory.get("step1")
.<Scan, CScan> chunk(100)
.reader(itemReader)
.processor(processor())
.writer(cqlWriter())
.build();
我的 IDE 抱怨这个 -
The method reader(ItemReader<? extends Scan>) in the type SimpleStepBuilder<Scan,CScan> is not applicable for the arguments (ScanItemReader)
更新 #2:
public class CassandraItemProcessor implements ItemProcessor<Scan, CScan>
@Override
public CScan process(Scan s) throws Exception
..
return new CScan();
public class CassandraBatchItemWriter implements ItemWriter<CScan>
@Override
public void write(List<? extends CScan> arg0) throws Exception
// TODO Auto-generated method stub
【问题讨论】:
【参考方案1】:你可以这样声明你的 Reader
@Component
@JobScope
public class ScanItemReader extends RepositoryItemReader<Scan>
private final ScanJpaRepository repository;
@Autowired
public ScanItemReader(final ScanJpaRepository repository)
super();
this.repository = repository;
@PostConstruct
protected void init()
final Map<String, Sort.Direction> sorts = new HashMap<>();
sorts.put("Your sort parameter"), Direction.ASC);// This could be any field name of your Entity class
this.setRepository(this.repository);
this.setSort(sorts);
this.setMethodName(""); // You should sepcify the method which
//spring batch should call in your repository to fetch
// data and the arguments it needs needs to be
//specified with the below method.
// And this method must return Page<T>
this.setArguments();
自动装配此阅读器 bean 并在您的 StepBuilder 中使用它。
【讨论】:
感谢您的回复。我无法使用当前的 pom 导入 QAbstractOrder 类的包。我需要导入任何其他依赖项吗? 抱歉打错了,如果需要排序,需要提供排序参数来对结果进行排序。您可以指定任何实体类的字段名称 我更新了关于下一个错误的问题。 StepBuilder 不接受 ScanItemReader。 您的处理器类和编写器类如何。至少显示类声明。您的处理器应将 Scan 作为输入,将 CScan 作为输出。因为你已经声明为<Scan, CScan>
很抱歉。根据要求添加了两个类。【参考方案2】:
我正在尝试做同样的事情,但被卡住了。如果它对您有用,请分享 git 存储库或任何其他参考。我想看看我们如何从一个数据库读取并写入另一个数据库。我已经有它的 JPA 方法。只想自动装配并使用它们。
【讨论】:
您找到任何解决方案了吗?我有同样的问题。提前致谢以上是关于Spring Boot + Spring Batch + Spring JPA的主要内容,如果未能解决你的问题,请参考以下文章
将已有的spring app迁移到spring-boot,手动配置spring-boot?
为啥 Spring Boot 应用程序 pom 同时需要 spring-boot-starter-parent 和 spring-boot-starter-web?
《02.Spring Boot连载:Spring Boot实战.Spring Boot核心原理剖析》