Spring Batch - 分块和多线程步骤 - RowMapper 中的 Nullpointer 异常
Posted
技术标签:
【中文标题】Spring Batch - 分块和多线程步骤 - RowMapper 中的 Nullpointer 异常【英文标题】:Spring Batch - Chunking & Multithreaded steps - Nullpointer exception in RowMapper 【发布时间】:2020-01-25 07:36:45 【问题描述】:当我在多个线程中运行我的步骤时,我在处理结果集时在我的行映射器中得到一个空指针异常,即使对于具有显式空检查的条目也是如此。当我在没有taskExecutor()
/ 单线程的情况下执行它时工作正常。我对几件事感到困惑。我的理解是,如果我将提交间隔指定为 100,核心线程数指定为 10,则每个线程都会拉出 100 个块并独立处理。
代码如下:
@Bean
public Step myStep()
return stepBuilderFactory.get(STEP_NAME).<MyModel, MyModel> chunk(1000)
.reader(myModelReader())
.writer(myModelWriter())
.taskExecutor(taskExecutor())
.listener(stepExecutionNotificationListener)
.listener(chunkExecutionListener)
.build();
@Bean
public Job myJob()
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.listener(jobCompletionNotificationListener)
.flow(myStep()).end().build();
@Bean
@StepScope
public JdbcCursorItemReader<MyModel> myModelReader()
JdbcCursorItemReader<MyModel> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setVerifyCursorPosition(false);
reader.setSql("my query fetching millions of records joining multiple tables from the db");
reader.setRowMapper(new MyModelRowMapper());
return reader;
public class MyModelRowMapperimplements RowMapper<MyModel>
@Override
public MyModel mapRow(ResultSet rs, int rowNum) throws SQLException
MyModel myModel = new MyModel();
myModel.setEmailAddress(checkIsEmpty(rs.getString("EMAIL_ADDRESS")) ? "" : rs.getString("EMAIL_ADDRESS").replace("|", "")); // ----- The line which is failing!!! -----
return person;
public boolean checkIsEmpty(String stringToCheck)
if(stringToCheck==null || stringToCheck.isEmpty() || stringToCheck.equals("null"))
return true;
return false;
public TaskExecutor taskExecutor()
ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(25);
threadPoolTaskExecutor.setQueueCapacity(5);
threadPoolTaskExecutor.setThreadNamePrefix("MyModelBatch-");
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
编辑 1
除了在非线程上下文中工作之外,如果我使用一次结果集,它也可以工作。 我将代码更改为
String email = rs.getString("EMAIL_ADDRESS");
myModel.setEmailAddress(checkIsEmpty(email) ? "" : email.replace("|", ""));
【问题讨论】:
您是在问 Spring Batching 的工作原理还是如何修复 NPE? 我希望通过了解 NPE 的工作原理来修复它。公平吗? 不,因为您假设 NPE 在某种程度上与线程有关。运行调试器并查看什么是空的,或者将错误行分解为多行,以便您可以看到发生 NPE 的确切行。您的RowMapper
是线程安全的,因此您应该先检查简单的事情,然后再将问题归咎于线程(并且批处理是从头开始为多线程构建的)。
至于你的块和数据库命中,你是对的。 1000 块大小使得 1000 db 命中 100 万行。
1.在发布问题之前,我已经尝试过了。 rs.getString("EMAIL_ADDRESS").replace("|", "") 失败,无论我提供什么空检查。当我尝试在调试中在 null 和有效值之间对其进行评估时,表达式的值会不断变化。 2.关于1000分贝的通话。一般来说,有这么多的数据库点击是一个好习惯吗?
【参考方案1】:
JdbcCursorItemReader
不是线程安全的(请参阅它的javadoc 以及此answer 中的更多详细信息)。原因是它包装了一个不是线程安全的ResultSet
。
所以您的问题是由于在多线程步骤中使用了非线程安全的项目阅读器。根据Javadoc:
对
read()
的每次调用都会调用提供的RowMapper,并传入ResultSet。
由于read
不同步,每个线程都可以调用它来读取项目。
要解决您的问题,您可以将 Jdbc 阅读器包装在 SynchronizedItemStreamReader
中。
【讨论】:
它确实有效。但是,在这种情况下,更改为 JdbcPagingItemReader 应该可以完成这项工作吗?使用 SynchronizedItemStreamReader 进行包装不会减慢我的步伐 是的,这是可能的,但最重要的是它使您的程序正确。更改为 JdbcPagingItemReader 是一个不错的选择。It does work.
:在这种情况下,请采纳答案。
JdbcPagingItemReader 应该是正确的答案吧?而且我看到网上有人甚至在 SynchronizedItemStreamReader 中包装它。我们试图通过将线程安全读取器包装在同步的项目流中来实现这一目标。
无需将 JdbcPagingItemReader 包装在同步装饰器中,因为它已经是线程安全的(参见其Javadoc)【参考方案2】:
由于处理ResultSet
是一个顺序操作,并且它是由RowMapper
在JdbcCursorItemReader
中完成的,因此不应该有任何线程干扰的可能性(如果您查看代码,代码非常简单:read row
-> map row to obj
-> return obj
)。
ResultSet
也不应该为对getXXX
的重复调用返回不同的值,尽管这可能无法保证(尽管这将是一个奇怪的实现,而且我从未听说过会这样做的驱动程序)。
因此,为了让您能够获得所描述的错误,ResultSet
必须在两个JdbcCursorItemReaders
之间共享,但我真的看不出这是怎么发生的,线程或没有线程。然后你可能会遇到结果集已经被推进的情况,但同样......他们不能分享ResultSets
。
这可能是一些配置问题,但我有一段时间没有做过 Spring Batch,所以不能说是我的头脑。
【讨论】:
以上是关于Spring Batch - 分块和多线程步骤 - RowMapper 中的 Nullpointer 异常的主要内容,如果未能解决你的问题,请参考以下文章
为啥我的 Spring Batch 多线程步骤在任何处理之前执行所有读取?