使用同步时在 Spring 分区中出现意外

Posted

技术标签:

【中文标题】使用同步时在 Spring 分区中出现意外【英文标题】:Unexpected in Spring partition when using synchronized 【发布时间】:2019-07-06 17:25:47 【问题描述】:

我正在使用 Spring Batch 和 Partition 进行并行处理。用于 db 的 Hibernate 和 Spring Data Jpa。对于分区步骤,读取器、处理器和写入器具有步进范围,因此我可以向它们注入分区键和范围(从到)。现在在处理器中,我有一个同步方法,并希望该方法一次运行一次,但事实并非如此。

我将它设置为有 10 个分区,所有 10 个项目阅读器都读取正确的分区范围。问题来自项目处理器。 Blow 代码与我使用的逻辑相同。

public class accountProcessor implementes ItemProcessor
    @override
    public Custom process(item) 
        createAccount(item);
        return item;
    

    //account has unique constraints username, gender, and email
    /*
        When 1 thread execute that method, it will create 1 account 
        and save it. If next thread comes in and  try to save the  same  account, 
        it  should find the account created by first thread and do one update. 
        But now it doesn't happen, instead findIfExist return null 
        and it  try to do another insert of duplicate data
    */
    private synchronized void createAccount(item) 
        Account account = accountRepo.findIfExist(item.getUsername(),  item.getGender(),  item.getEmail());
        if(account  == null) 
            //account  doesn't  exist
            account = new Account();
            account.setUsername(item.getUsername());
            account.setGender(item.getGender());
            account.setEmail(item.getEmail());
            account.setMoney(10000);
         else 
            account.setMoney(account.getMoney()-10);
        
        accountRepo.save(account);
    

预期的输出是在任何给定时间只有 1 个线程将运行此方法,因此不会在 db 中重复插入并避免 DataintegrityViolationexception。

实际结果是第二个线程找不到第一个账号,尝试创建一个重复账号并保存到db,会导致DataintegrityViolationexception, unique constraints错误。

由于我同步了方法,线程应该按顺序执行它,第二个线程应该等待第一个线程完成然后运行,这意味着它应该能够找到第一个帐户。

我尝试了许多方法,例如包含所有唯一帐户的 volatile 集,执行 saveAndFlush 以尽快提交,使用 threadlocal 无论如何,这些都不起作用。

需要帮助。

【问题讨论】:

如果你的处理器是步进作用域,那么会有10个处理器实例 @PrabhakarD 你的意思是这 10 个处理器,每个处理器都有自己的同步方法? 【参考方案1】:

由于您将项目处理器设置为步进范围,因此您实际上不需要同步,因为每个步骤都有自己的处理器实例。

但您似乎遇到了设计问题而不是实施问题。您正在尝试同步线程以在并行设置中以特定顺序运行。当您决定并行处理并将数据划分为分区并给每个工作人员(本地或远程)一个分区来处理时,您必须承认这些分区将以未定义的顺序进行处理,并且记录之间不应该有任何关系每个分区或每个工人完成的工作之间。

当 1 个线程执行该方法时,它将创建 1 个帐户 并保存它。如果下一个线程进来并尝试保存同一个帐户, 它应该找到第一个线程创建的帐户并进行一次更新。但是现在它没有发生,而是 findIfExist 返回 null 并尝试再次插入重复数据

那是因为thread1的事务可能还没有提交,所以thread2不会找到你认为已经被thread1插入的记录。

您似乎正在尝试使用分区设置创建或更新某些帐户。我不确定这个设置是否适合手头的问题。

作为旁注,我不会在项目处理器中调用accountRepo.save(account);,而是在项目编写器中这样做。

希望这会有所帮助。

【讨论】:

即使在写入步骤中,任何同步方法的行为都相同! thread1 的事务可能未提交,thread2 进行读取!任何其他建议的设计实现来实现这个实现?

以上是关于使用同步时在 Spring 分区中出现意外的主要内容,如果未能解决你的问题,请参考以下文章

制作 Discord Bot 时在 Javascript 中出现“意外的输入结束”

“致命错误:在 Swift ios 中使用 vImageBuffer_initWithCGImage 时在展开可选值时意外发现 nil”

Spring 问题:出现意外错误(类型=未找到,状态=404)

Spring @RestController,spring-boot 出现意外错误(类型=不可接受,状态=406)

使用 SimpleMappingExceptionResolver 时在 spring:message 标签中访问 exception.class.name

连接到 RDBMS 时在 Spark 中进行分区