水平扩展写入时如何避免并发问题?

Posted

技术标签:

【中文标题】水平扩展写入时如何避免并发问题?【英文标题】:How to avoid concurrency issues when scaling writes horizontally? 【发布时间】:2015-05-09 22:37:09 【问题描述】:

假设有一个工作服务从队列接收消息,从文档数据库中读取具有指定 Id 的产品,根据消息应用一些操作逻辑,最后将更新后的产品写回数据库 (a) .

当处理不同的产品时,这项工作可以安全地并行完成,因此我们可以水平扩展 (b)。但是,如果多个服务实例在同一个产品上工作,我们最终可能会遇到并发问题,或者数据库中的并发异常,在这种情况下,我们应该应用一些重试逻辑(但重试仍然可能再次失败等等) .

问题:我们如何避免这种情况?有没有办法确保两个实例不在同一个产品上运行?

示例/用例:一家在线商店在产品 A、产品 B 和产品 C 上进行了一次大促销,在一个小时内结束,数百名客户正在购买。对于每次购买,都会将一条消息排入队列(productId、numberOfItems、price)。 目标:我们如何运行我们的工作服务的三个实例,并确保 productA 的所有消息都将在 instanceA、productB 到 instanceB 和 productC 到 instanceC 中结束(不会导致并发问题)?

注意事项:我的服务是用 C# 编写的,作为辅助角色托管在 Azure 上,我使用 Azure Queues 进行消息传递,并且我正在考虑使用 Mongo 进行存储。此外,实体 ID 为 GUID

更多的是关于技术/设计,所以如果您使用不同的工具来解决问题,我仍然感兴趣。

【问题讨论】:

+1 @GregD 的回答“了解您的数据模型和使用模式”,尽管我会更进一步。要消除并发性,您需要重新设计数据模型和业务逻辑,这样您就不会真正更新数据,而只是追加数据。仅追加数据模型是并发友好的 - 这意味着它们不会阻塞,但您可能需要重新设计从数据模型读取的逻辑。 @UdiDahan Append-only 模型听起来确实适合这里的问题,因此非常感谢您提供一些细节的答案 【参考方案1】:

我假设您有办法在所有工作人员服务中安全地访问产品队列。鉴于此,避免冲突的一种简单方法是在主队列旁边为每个产品使用全局队列

// Queue[X] is the queue for product X
// QueueMain is the main queue 
DoWork(ProductType X)

  if (Queue[X].empty())
  
    product = QueueMain().pop()
    if (product.type != X)
    
      Queue[product.type].push(product) 
      return;
    
  else
  
     product = Queue[X].pop()
  

  //process product...

对队列的访问需要是原子的

【讨论】:

【参考方案2】:

1) 我能想到的每一个大规模数据解决方案都内置了一些东西来精确处理这种冲突。详细信息将取决于您对数据存储的最终选择。对于传统的关系数据库,无需您进行任何附加工作即可完成此操作。有关详细信息,请参阅您选择的技术文档。

2) 了解您的数据模型和使用模式。适当地设计您的数据存储。不要为你没有的规模设计。针对最常见的使用模式进行优化。

3) 挑战你的假设。你真的不得不频繁地从多个角色中改变同一个实体吗?有时答案是肯定的,但通常您可以简单地创建一个类似的新实体来反映更新。 IE,采用 journaling/logging 方法而不是单一实体方法。最终,单个实体的大量更新永远不会扩展。

【讨论】:

冲突确实由数据库处理,例如通过抛出并发错误。问题是,如果可以的话,我怎样才能避免出现失败并重试的情况。 底层技术为数据一致性提供了必要的保护。我相信在复杂场景(大量客户端访问相同数据)中最小化或消除并发问题的应用程序级解决方案对于效率和性能至关重要。【参考方案3】:

如果您希望数据库始终保持最新并且始终与已处理的单元保持一致,那么您可以对同一个可变实体进行多次更新。

为了遵守这一点,您需要序列化同一实体的更新。要么通过在生产者处对数据进行分区来做到这一点,要么将实体的事件累积在同一队列中,要么使用分布式锁或数据库级别的锁将实体锁定在工作线程中。

您可以使用 Actor 模型(在使用 akka 的 java/scala 世界中)为每个实体或串行处理它们的实体组创建一个消息队列。

更新 你可以试试akka port to .net 和here。 在这里,您可以找到一个很好的教程,其中包含有关使用 akka in scala 的示例。 但是对于一般原则,您应该更多地搜索 [actor model]。但它也有缺点。

最终涉及对您的数据进行分区以及为特定实体创建独特的专业工作者(可以重复使用和/或在发生故障时重新启动)的能力。

【讨论】:

为每个实体创建消息队列的actor模型,这听起来很有趣,能否提供一些参考资料以便我查找?【参考方案4】:

任何试图将负载分配到同一集合中的不同项目(如订单)的解决方案都注定要失败。原因是,如果您的交易流量很高,则必须开始执行以下操作之一:

    让节点相互交谈 (hey guys, are anyone working with this?) 将ID生成划分为段(节点a创建ID 1-1000,节点B 1001-1999)等,然后让它们处理自己的段 动态地将集合划分为段(并让每个节点处理一个段。

那么这些方法有什么问题?

第一种方法是简单地复制数据库中的事务。除非您可以花费大量时间优化策​​略,否则最好依赖交易。

后两个选项会降低性能,因为您必须根据 id 动态路由消息,并且还要在运行时更改策略以包括新插入的消息。它最终会失败。

解决方案

这里有两种解决方案,您也可以组合使用。

自动重试

相反,您在某处有一个从消息队列读取的入口点。

你有这样的东西:

while (true)

    var message = queue.Read();
    Process(message);

为了获得非常简单的容错能力,您可以做的是在失败时重试:

while (true)

    for (i = 0; i < 3; i++)
    
       try
       
            var message = queue.Read();
            Process(message);
            break; //exit for loop
       
       catch (Exception ex)
       
           //log
           //no throw = for loop runs the next attempt
       
    

您当然可以只捕获数据库异常(或者更确切地说是事务失败)来重播这些消息。

微服务

我知道,微服务是一个流行词。但在这种情况下,这是一个很好的解决方案。与其拥有一个处理所有消息的单一核心,不如将应用程序分成更小的部分。或者在您的情况下,只需停用某些类型的消息的处理。

如果您有五个节点运行您的应用程序,您可以确保节点 A 接收与订单相关的消息,节点 B 接收与运输等相关的消息。

通过这样做,您仍然可以水平扩展您的应用程序,您不会遇到任何冲突并且需要很少的努力(更多的消息队列并重新配置每个节点)。

【讨论】:

关于重试:当然,在db事务/并发失败的情况下有一些重试策略是必要的,因为无论你如何设计你的应用程序,最终都可能出现这样的异常。我只是想知道如何在让它们发生之前将这些失败的可能性降到最低。关于微服务:这已经是一个微服务,它只接受来自专用队列的 productSold 消息并更新产品。然而,水平扩展微服务仍然是一个问题,因此是个问题。 根据给定的信息很难给出更详细的建议。例如,每秒处理多少条消息(所有产品消息)?为什么您只为一种消息类型(productSold)创建服务?你是如何扩展数据库的?数据库每秒为产品处理多少事务?对我来说,没有一个服务处理所有产品消息(在一个消息队列中)有点奇怪,因为数据库应该是瓶颈而不是消息处理。我只是对你的动机感兴趣。 这是一个专门用于更新产品销售信息的微服务。队列非常快且高度可用,数据库可以轻松扩展(使用分片和复制),但如果我们不能解决我们在这里讨论的问题,服务就无法扩展。因此,在需求高峰期,瓶颈将是服务。【参考方案5】:

对于这种事情,我使用 blob 租约。基本上,我使用某个已知存储帐户中的实体 ID 创建了一个 blob。当 worker 1 拿起实体时,它会尝试获取 blob 的租约(如果 blob 不存在,则自行创建 blob)。如果两者都成功,那么我允许对消息进行处理。之后总是释放租约。 如果我不成功,我将消息转储回队列中

我遵循 Steve Marx 最初在此处 http://blog.smarx.com/posts/managing-concurrency-in-windows-azure-with-leases 描述的方法,尽管经过调整以使用新的存储库

在 cmets 之后编辑: 如果您可能有很高的消息都与同一个实体交谈(正如您的推荐所暗示的那样),我会在某处重新设计您的方法......无论是实体结构还是消息结构。

例如:考虑 CQRS 设计模式并独立存储来自处理每条消息的更改。因此,产品实体现在是各种工作人员对实体所做的所有更改的集合,依次重新应用并重新水化为单个对象

【讨论】:

我明白了,就像一个分布式锁。 +1,因为它确实解决了问题,但我想知道我们是否可以做得更好。例如。如果队列有 10 个后续项目,全部用于同一产品,并且我们有 10 个实例,则 1 个将执行实际工作,其他 9 个将无法获取锁定并重新入队项目,在第二次迭代中,1 个将工作,8 个将失败,在第 3 次迭代中 1 会成功 7 会失败,我们最终会遇到 45 次失败 - 浪费资源和时间。 我认为您的意思更像是事件溯源,而不是 cqrs(cqrs 已经是我们这里所拥有的,这是命令部分) 我看到这种技术使用了很多,但我发现了两个问题。一是它弄乱了队列的先进先出字符(通常没有问题),二是如果这种情况过于频繁,则会产生大量开销。我相信消息队列(即 Azure 上的服务总线)比简单队列更适合这种情况 @mxa055 服务总线如何解决问题? @Igorek 我会让你失望的,我只是在主题演讲中使用了形状和文本框并拍了一张快照

以上是关于水平扩展写入时如何避免并发问题?的主要内容,如果未能解决你的问题,请参考以下文章

一直再说高并发,多少QPS才算高并发?

一直再说高并发,多少QPS才算高并发?

在 Android 上使用 SQLite 时如何避免并发问题?

如何在更新和插入并发时避免重复行

如何在 Laravel 中在一秒钟内发出并发请求时避免重复记录

高并发处理思路与手段:扩容