让多个线程对一个数据集进行操作,而一个线程对其进行总结

Posted

技术标签:

【中文标题】让多个线程对一个数据集进行操作,而一个线程对其进行总结【英文标题】:Letting multiple Threads operate on a data set while one Thread sums it up 【发布时间】:2015-04-15 09:44:51 【问题描述】:

我正在尝试实施一个银行系统,其中我有一组帐户。有多个线程试图在账户之间转移资金,而一个线程连续(或者更确切地说,随机时间)试图总结银行中的总资金(所有账户余额的总和)。

解决这个问题的方法起初听起来很明显;使用ReentrantReadWriteLocksreadLock 用于执行事务的线程,writeLock 用于执行求和的线程。然而,在以这种方式实现之后(参见下面的代码),我看到性能/“事务吞吐量”大幅下降,即使与仅使用一个线程进行事务相比也是如此。

上述实现代码:

public class Account implements Compareable<Account>
   private int id;
   private int balance;

   public Account(int id)
      this.id = id;
      this.balance = 0;
   

   public synchronized int getBalance() return balance; 

   public synchronized setBalance(int balance)
      if(balance < 0) throw new IllegalArgumentException("Negative balance"); 
      this.balance = balance;
   

   public int getId() return this.id; 

   // To sort a collection of Accounts.
   public int compareTo(Account other)
      return (id < other.getId() ? -1 : (id == other.getId() ? 0 : 1));
   


public class BankingSystem 
   protected List<Account> accounts;
   protected ReadWriteLock lock = new ReentrantReadWriteLock(); // !!

   public boolean transfer(Account from, Account to, int amount)
      if(from.getId() != to.getId())
         synchronized(from)
            if(from.getBalance() < amount) return false;
            lock.readLock().lock(); // !!
            from.setBalance(from.getBalance() - amount);
         
         synchronized(to) 
            to.setBalance(to.getBalance() + amount);
            lock.readLock().unlock(); // !! 
         
      
      return true;
   

   // Rest of class..

请注意,这甚至还没有使用求和方法,因此从未获取过writeLock。如果我只删除标有// !! 的行并且也不调用求和方法,那么使用多线程的“传输吞吐量”突然比使用单线程时高很多,这就是目标。

我现在的问题是,如果我从不尝试获取 writeLock,那么为什么简单引入 readWriteLock 会减慢整个事情的速度,而我在这里做错了什么,因为我无法管理找到问题。

旁注: 我已经问过一个关于这个问题的问题here,但设法问了一个错误的问题。然而,我确实为我提出的那个问题得到了一个惊人的答案。我决定不大幅降低问题质量,并为需要帮助的人保留这个很好的答案,我不会编辑问题(再次)。相反,我打开了这个问题,坚信这不是重复的,而是完全不同的问题。

【问题讨论】:

【参考方案1】:

您通常会使用任一锁或synchronized,同时使用两者并不常见。

为了管理您的场景,您通常会在每个帐户上使用细粒度锁,而不是像您那样使用粗粒度锁。您还可以使用侦听器来实现总计机制。

public interface Listener 

    public void changed(int oldValue, int newValue);


public class Account 

    private int id;
    private int balance;
    protected ReadWriteLock lock = new ReentrantReadWriteLock();
    List<Listener> accountListeners = new ArrayList<>();

    public Account(int id) 
        this.id = id;
        this.balance = 0;
    

    public int getBalance() 
        int localBalance;
        lock.readLock().lock();
        try 
            localBalance = this.balance;
         finally 
            lock.readLock().unlock();
        
        return localBalance;
    

    public void setBalance(int balance) 
        if (balance < 0) 
            throw new IllegalArgumentException("Negative balance");
        
        // Keep track of the old balance for the listener.
        int oldValue = this.balance;
        lock.writeLock().lock();
        try 
            this.balance = balance;
         finally 
            lock.writeLock().unlock();
        
        if (this.balance != oldValue) 
            // Inform all listeners of any change.
            accountListeners.stream().forEach((l) -> 
                l.changed(oldValue, this.balance);
            );
        
    

    public boolean lock() throws InterruptedException 
        return lock.writeLock().tryLock(1, TimeUnit.SECONDS);
    

    public void unlock() 
        lock.writeLock().unlock();
    

    public void addListener(Listener l) 
        accountListeners.add(l);
    

    public int getId() 
        return this.id;
    



public class BankingSystem 

    protected List<Account> accounts;

    public boolean transfer(Account from, Account to, int amount) throws InterruptedException 
        if (from.getId() != to.getId()) 
            if (from.lock()) 
                try 
                    if (from.getBalance() < amount) 
                        return false;
                    
                    if (to.lock()) 
                        try 
                            // We have write locks on both accounts.
                            from.setBalance(from.getBalance() - amount);
                            to.setBalance(to.getBalance() + amount);
                         finally 
                            to.unlock();
                        

                     else 
                        // Not sure what to do - failed to lock the account.
                    
                 finally 
                    from.unlock();
                

             else 
                // Not sure what to do - failed to lock the account.
            
        
        return true;
    

    // Rest of class..

请注意,您可以在同一个线程中获得两次写锁 - 第二次也是允许的。锁只排除其他线程的访问。

【讨论】:

对于细粒度与粗粒度问题;在接受 Holger 的建议后,我的账户本身不再有任何锁定,所以唯一的锁定将是求和的锁定。但是,在阅读了您的答案之后,这当然也可能是摆脱该锁定的解决方案,从而再次提高吞吐量。我会考虑实现这种求和方式,非常感谢 @Phil - 我也为您添加了一个示例BankingSystem,以便您了解如何锁定两个帐户以进行转移。 不要锁定两个账户;这是死锁的秘诀。如果一个线程试图从 a 转移到 b 而另一个试图从 b 转移到 a 会发生什么?顺便说一句,不需要持有两把锁。全局锁定的全部目的是在汇总所有帐户时避免挂起的转账,而不是避免并发转账。 @Holger - 好电话 - 我已将其更改为 tryLock,这应该可以避免死锁。根据您所说的意图,其他技术可能是合适的。 那么当tryLock返回false时你会怎么做?简单地忽略您不拥有锁并不是一个正确的解决方案。【参考方案2】:

首先,将更新放入自己的synchronized 块中是正确的,即使getter 和setter 本身是synchronized,所以你避免check-then-act 反模式。

但是,从性能的角度来看,它并不是最佳的,因为您获得了 3 次相同的锁(from 帐户为 4 次)。 JVM 或 HotSpot 优化器知道同步原语并且能够优化这种嵌套同步模式,但是(现在我们不得不猜测一下)如果你在中间获得另一个锁,它可能会阻止这些优化。

正如在另一个问题中已经建议的那样,您可以求助于无锁更新,但您当然必须完全理解它。无锁更新以一个特殊操作为中心,compareAndSet 仅当变量具有预期的旧值时才执行更新,换句话说,中间没有执行并发更新,而检查和更新是作为一个执行的原子操作。并且该操作不是使用synchronized 实现的,而是直接使用专用的 CPU 指令。

使用模式总是喜欢

    读取当前值 计算新值(或拒绝更新) 尝试执行更新,如果当前值仍然相同,则会成功

缺点是更新可能会失败,这需要重复这三个步骤,但如果计算量不太大,这是可以接受的,因为更新失败表明另一个线程必须在其间更新成功,所以总会有一个进步。

这导致了一个帐户的示例代码:

static void safeWithdraw(AtomicInteger account, int amount) 
    for(;;)  // a loop as we might have to repeat the steps
        int current=account.get(); // 1. read the current value
        if(amount>current) throw new IllegalStateException();// 2. possibly reject
        int newValue=current-amount; // 2. calculate new value
        // 3. update if current value didn’t change
        if(account.compareAndSet(current, newValue))
            return; // exit on success
    

因此,为了支持无锁访问,提供getBalancesetBalance 操作是远远不够的,因为每次尝试在没有锁定的情况下组合getset 操作的操作都会失败。 您有三个选择:

    将每个受支持的更新操作作为专用方法提供,例如 safeWithdraw 方法 提供compareAndSet 方法以允许调用者使用该方法编写自己的更新操作 提供一个以更新函数为参数的更新方法,如AtomicInteger does in Java 8; 当然,这在使用 Java 8 时特别方便,您可以使用 lambda 表达式来实现实际的更新功能。

请注意,AtomicInteger 本身使用所有选项。有针对常见操作的专用更新方法,例如increment,还有compareAndSet 方法允许组合任意更新操作。

【讨论】:

非常感谢您的解释,我现在会尝试相应地更改我的实现,看看这对我有什么帮助! 我用safeDepositsafeWithdraw 方法(非同步,但原子)更改了类Account 的实现以适应现在的要求。没有ReadWriteLocks 也能完美运行。 (transfer 方法现在也不再包含同步语句。)。但是,只要我再次尝试锁定/解锁ReadWriteLock,性能下降就会再次出现。似乎这首先与使用ReadWriteLock 做更多的事情,而不是实现问题,也许? 如果您现在配置文件会发生什么?与您的原始代码不同,线程现在不能被synchronized 阻塞...... 没错,他们不能,他们也不能。如果我分析一下,一切看起来都很正常,不会互相阻挡,但引入的 readWriteLock 只会花费更长的时间。这就是为什么我说,我开始觉得问题更多在于锁的选择而不是它的实现,虽然这也很难想象.. 还没有,到目前为止我只尝试过使用 Java 8 运行/编译。我应该尝试 Java 7 吗?【参考方案3】:

锁定很昂贵,但在您的情况下,我假设您在运行测试时可能会出现某种“几乎死锁”:如果某个线程在您的代码的 synchronized(from) 块中并且另一个线程想要解锁 from 实例在它的 synchronized(to) 块中,那么它将无法:第一个 synchronized 将阻止线程 #2 进入 synchronized(to) 块,因此锁不会很快被释放.

这可能会导致很多线程挂在锁的队列中,从而导致获取/释放锁的速度变慢。

更多注意事项:当第二部分 (to.setBalance(to.getBalance() + amount);) 由于某种原因(异常、死锁)未执行时,您的代码将导致问题。您需要找到一种方法来围绕这两个操作创建事务,以确保它们要么同时执行,要么都不执行。

这样做的一个好方法是创建一个Balance 值对象。在您的代码中,您可以创建两个新的,更新两个余额,然后调用这两个设置器 - 因为设置器不会失败,所以两个余额都将被更新,或者代码将在调用任何设置器之前失败。

【讨论】:

锁是共享锁。只要没有持有写锁,就不会有死锁。 @Holger: synchronized 是写锁。 我可以看到你提出的两个同步语句相互阻塞的建议,谢谢你的提示!但是,问题不可能真的存在,因为如果我删除了 ReadWriteLock,我就没有性能问题.. @Phil:如果您删除 synchronized 块,它也应该会消失(但是,您会遇到余额错误)。问题是synchronized 块会干扰锁。试着把锁移到外面。 但是没有嵌套的synchronized块,没有循环,也没有阻塞操作,所以你解释有没有及时释放的锁是完全错误的。

以上是关于让多个线程对一个数据集进行操作,而一个线程对其进行总结的主要内容,如果未能解决你的问题,请参考以下文章

Wpf中“由于其他线程拥有此对象,因此调用线程无法对其进行访问”

Wpf中“由于其他线程拥有此对象,因此调用线程无法对其进行访问”的问题

Java锁相关

『Python』 多线程 共享变量的实现

让线程A阻塞B&C,反之亦然,但不要让线程B阻塞线程C,反之亦然?

Java多线程-2