为啥 Spring 批处理多线程步骤会破坏 ItemProcessor 中的其他存储库休眠查询?
Posted
技术标签:
【中文标题】为啥 Spring 批处理多线程步骤会破坏 ItemProcessor 中的其他存储库休眠查询?【英文标题】:Why does Spring batch multithreaded step break additional repository hibernate queries in ItemProcessor?为什么 Spring 批处理多线程步骤会破坏 ItemProcessor 中的其他存储库休眠查询? 【发布时间】:2020-09-22 09:28:54 【问题描述】:我致力于通过 Spring Batch 将包含大约 30-40 个表的巨大 DB2 模式转换为简化的 JSON 格式。我的进程在一个线程上工作得很好,但是一旦我增加线程池大小以启用多线程我的步骤,我的 ItemProcessor 就会因令人发指的神秘和难以理解的错误而崩溃。
我只是不明白我的处理器怎么可能不是线程安全的:我没有在任何地方维护状态,我只是进行了一些额外的存储库调用来丰富数据,因为我无法获得读者拉入我需要的一切。一个这样的存储库调用是抛出 ArrayIndexOutOfBoundsException!我什至在我的处理器中添加了@Transactional
注释,并使用 Java 的多线程 java.util.concurrent.ExecutorService 调用它——在那里也可以正常工作。
我似乎无法弄清楚为什么我的 Spring Batch 多线程步骤会破坏我的项目处理器的简单存储库查询。我什至间歇性地得到 LazyLoading 异常!项目处理器不应该包含在事务中吗?并且间歇性地我看到关于类转换异常的绝对荒谬的异常,其中一对多映射返回错误的实体类型!同样,这一切都适用于一个线程中的相同数据集。是不是我的配置不对?
java.lang.ArrayIndexOutOfBoundsException: 1765
at org.hibernate.engine.internal.EntityEntryContext.reentrantSafeEntityEntries(EntityEntryContext.java:319) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.engine.internal.StatefulPersistenceContext.reentrantSafeEntityEntries(StatefulPersistenceContext.java:1156) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.event.internal.AbstractFlushingEventListener.prepareEntityFlushes(AbstractFlushingEventListener.java:145) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.event.internal.AbstractFlushingEventListener.flushEverythingToExecutions(AbstractFlushingEventListener.java:83) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.event.internal.DefaultAutoFlushEventListener.onAutoFlush(DefaultAutoFlushEventListener.java:46) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.internal.SessionImpl.autoFlushIfRequired(SessionImpl.java:1433) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.internal.SessionImpl.list(SessionImpl.java:1519) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.query.internal.AbstractProducedQuery.doList(AbstractProducedQuery.java:1538) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.query.internal.AbstractProducedQuery.list(AbstractProducedQuery.java:1506) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.hibernate.query.Query.getResultList(Query.java:132) ~[hibernate-core-5.3.14.Final.jar!/:5.3.14.Final]
at org.springframework.data.jpa.repository.query.JpaQueryExecution$CollectionExecution.doExecute(JpaQueryExecution.java:129) ~[spring-data-jpa-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.data.jpa.repository.query.JpaQueryExecution.execute(JpaQueryExecution.java:91) ~[spring-data-jpa-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.doExecute(AbstractJpaQuery.java:136) ~[spring-data-jpa-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.execute(AbstractJpaQuery.java:125) ~[spring-data-jpa-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.doInvoke(RepositoryFactorySupport.java:605) ~[spring-data-commons-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.lambda$invoke$3(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor$$Lambda$541/246846952.get(Unknown Source) ~[na:na]
at org.springframework.data.repository.util.QueryExecutionConverters$$Lambda$540/1619129136.apply(Unknown Source) ~[na:na]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.invoke(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:59) ~[spring-data-commons-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor$$Lambda$539/1493625851.proceedWithInvocation(Unknown Source) ~[na:na]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295) ~[spring-tx-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:144) ~[spring-data-jpa-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$ExposeRepositoryInvocationInterceptor.invoke(CrudMethodMetadataPostProcessor.java:364) ~[spring-data-jpa-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93) ~[spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.data.repository.core.support.SurroundingTransactionDetectorMethodInterceptor.invoke(SurroundingTransactionDetectorMethodInterceptor.java:61) ~[spring-data-commons-2.1.14.RELEASE.jar!/:2.1.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.12.RELEASE.jar!/:5.1.12.RELEASE]
...
这是我的属性文件:
#I don't want all my jobs run, I want to specify which one via an environment variable
spring.batch.job.enabled=false
spring.main.allow-bean-definition-overriding=true
spring.main.web-application-type=none
#If I set this count to 1, everything works just fine.
process.thread.count=4
process.page.size=25
hibernate.jdbc.fetch_size=100
process.chunk.size=25
process.publish.limit=200
我的配置类:
@Configuration
public class FullProductSyncBatchJobConfig
@Bean
@StepScope
public ItemStreamReader<LegacyProduct> productReader(
LegacyProductRepository legacyProductRepository,
@Value("#jobParameters[pageSize]") Integer pageSize,
@Value("#jobParameters[limit]") Integer limit)
RepositoryItemReader<LegacyProduct> legacyProductRepositoryReader = new RepositoryItemReader<>();
legacyProductRepositoryReader.setRepository(legacyProductRepository);
legacyProductRepositoryReader.setMethodName("findAllRelevantProducts");
legacyProductRepositoryReader.setSort(new HashMap<String, Sort.Direction>()
put("id.guid", Sort.Direction.ASC);
put("id.modelNumber", Sort.Direction.ASC);
);
legacyProductRepositoryReader.setPageSize(pageSize);
if(limit > 0) legacyProductRepositoryReader.setMaxItemCount(limit);
legacyProductRepositoryReader.setSaveState(false);
return legacyProductRepositoryReader;
@Bean
@StepScope
public ItemProcessor<LegacyProduct, StreamlinedProduct> productDocumentBuilder(
SupplierRepository supplierRepository)
return new StreamlinedProductBuilder(supplierRepository);
@Bean
@StepScope
public ItemWriter<StreamlinedProduct> productDocumentPublisher(
GcpPubsubPublisherService publisherService)
return new StreamlinedProductPublisher(publisherService);
@Bean
public Step fullProductSync(ItemStreamReader<LegacyProduct> productReader,
ItemProcessor<LegacyProduct, StreamlinedProduct> productDocumentBuilder,
ItemWriter<StreamlinedProduct> productDocumentPublisher,
StepBuilderFactory stepBuilderFactory,
TaskExecutor syncProcessThreadPool,
PlatformTransactionManager jpaTransactionManager,
@Value("$process.chunk.size:100") Integer chunkSize,
@Value("$process.publish.timeout.retry.limit:2") int timeoutRetryLimit,
@Value("$process.failure.limit:20") int maximumProcessingFailures)
return stepBuilderFactory.get("fullProductSync")
.transactionManager(jpaTransactionManager)
.<GtinVendor, AbstractProduct>chunk(chunkSize)
.reader(productReader)
.processor(productDocumentBuilder)
.writer(productDocumentPublisher)
.faultTolerant()
.retryLimit(timeoutRetryLimit)
.retry(TimeoutException.class)
.skipPolicy(new SyncProcessSkipPolicy(maximumProcessingFailures))
.listener(new SyncProcessSkipListener()) // <== just logs them right now
.taskExecutor(syncProcessThreadPool)
.build();
@Bean
public Job fullProductSyncJob(Step fullProductSync,
JobBuilderFactory jobBuilderFactory)
return jobBuilderFactory.get("fullProductSync")
.start(fullProductSync)
.build();
还有我的处理器类:
@Slf4j
public class StreamlinedProductBuilder implements ItemProcessor<LegacyProduct, StreamlinedProduct>
private DimensionRepository dimensionRepository;
public LegacyProductBuilder(DimensionRepository dimensionRepository)
this.dimensionRepository = dimensionRepository;
public StreamlinedProduct process(LegacyProduct legacyProduct)
StreamlinedProduct streamlinedProduct = new StreamlinedProduct();
streamlinedProduct.setPrimarySupplierNumber(parsePrimarySupplierNumber(product.getSuppliers()));
attachProductDimensions(streamlinedProduct, legacyProduct);
return streamlinedProduct;
private int parsePrimarySupplierNumber(List<Supplier> suppliers)
/* This intermittently throws a ClassCastException when using multiple threads,
* saying that a Description can't be cast a Supplier... WHAT??! HOW???! How does
* getSuppliers() ever return a list of a completely different one-to-many entity????
*/
for(Supplier supplier : suppliers)
if(supplier.isPrimary()) return supplier.getId();
return -1;
private void attachProductDimensions(LegacyProduct legacyProduct,
StreamlinedProduct streamlinedProduct)
// The following line occasionally throws the ArrayOutOfBounds index I mentioned above. WHY?
// Works just fine in one thread...
List<Dimension> dimensions= dimensionRepository.findByProductIdAndModel(
legacyProduct.getId().getGuid(), legacyProduct.getId().getModelNumber());
Map<String, Double> dimensionsAsMap = new HashMap<>();
for (Description description : descriptions )
dimensionsAsMap.put(dimension.getName(), dimension.getValue());
streamlinedProduct.setDimensions(dimensionsAsMap);
我的仓库:
@Repository
public interface DimensionRepository extends PagingAndSortingRepository<ProductDimension, DimensionCompositePK>
@Transactional(isolation = Isolation.READ_UNCOMMITTED) // <== fails with or without this
@Query(value = "select d.name as name, d.value as value " +
"from h-schemaproduct p left join h-schemadimension d " +
"on p.guid = d.product_guid and p.model_number = d.product_model_number " +
"where p.guid = :guid and p.model_number = :model", nativeQuery = true)
List<Dimension> findVendorsByGtinGlnCountry(@Param("guid") String guid,
@Param("modelNumber") Integer model);
【问题讨论】:
处理器线程中使用的dimensionRepository
是否安全?您是否尝试过同步访问StreamlinedProductBuilder#process
方法? (为什么这个后缀是Builder
btw?这是一个ItemProcessor
..)
是的,我认为大多数 Spring 存储库都是线程安全的,尤其是 PagaingAndSortingRepository。它通常必须处理来自 Controller 的多个请求,我认为 Spring Controller 使用多个线程来处理传入请求,对吧?哦,我在这种情况下使用了Builder
,因为这比更通用的Processor
后缀更能描述它的作用。
我将整个处理器方法放在一个同步块中,但我仍然看到问题:(
使用该同步块,我得到一个类转换异常,其中我的 oneToMany 列表之一上的 getter 抱怨它不能将元素转换为与不同 oneToMany 完全不同的实体,这没有任何意义对我来说。我不明白 Spring 会如何混淆放置两个 oneToMany 关系的实体的位置,即使我确实有很多实体。
我想这两个 oneToMany 列表属性以相同的 4 个字符开头,但这将是一个地狱般的错误 :(
【参考方案1】:
我不知道发生了什么,也不知道为什么我看到异常表明 Spring 正在尝试使用来自不同 oneToMany 关系的错误 Entity 类填充某些 oneToMany 关系。
但无论如何,采用 Spring Batch 驱动程序查询模式似乎对我来说已经解决了问题。我的印象是,给定的块在读取器、处理器和写入器之间共享相同的事务。但我认为这个假设是不正确的——每个阶段的每一块都有自己的事务。
所以现在我的阅读器只需拉入我的实体类的所有相关 ID,然后处理器就会出去并获取该实体的 oneToMany 关系。
【讨论】:
以上是关于为啥 Spring 批处理多线程步骤会破坏 ItemProcessor 中的其他存储库休眠查询?的主要内容,如果未能解决你的问题,请参考以下文章
为啥 spring-boot-starter-jdbc 会破坏我的 REST 端点?