Akka 框架支持查找重复消息

Posted

技术标签:

【中文标题】Akka 框架支持查找重复消息【英文标题】:Akka framework support for finding duplicate messages 【发布时间】:2012-01-10 04:29:11 【问题描述】:

我正在尝试使用 Akka 和 Scala 构建一个高性能的分布式系统。

如果请求昂贵(且无副作用)计算的消息到达,并且之前已请求完全相同的计算,我想避免再次计算结果。如果之前请求的计算已经完成并且结果可用,我可以缓存它并重新使用它。

但是,可以请求重复计算的时间窗口可以任意小。例如出于所有实际目的,我可以在同一时刻收到一千或一百万条请求相同昂贵计算的消息。

有一个名为 Gigaspaces 的商业产品据说可以处理这种情况。

但是,目前在 Akka 中似乎没有框架支持处理重复的工作请求。鉴于 Akka 框架已经可以访问通过该框架路由的所有消息,看来框架解决方案在这里很有意义。

这是我建议 Akka 框架做的事情: 1. 创建一个特征来指示一种消息类型(例如,“ExpensiveComputation”或类似的东西),这些消息将受到以下缓存方法的影响。 2. 智能地(散列等)识别由(相同或不同)参与者在用户可配置的时间窗口内接收到的相同消息。其他选项:选择用于此目的的最大缓冲区大小的内存,受(例如 LRU)替换等的影响。Akka 也可以选择仅缓存处理成本高的消息的结果;如果需要,可以重新处理那些花费很少时间处理的消息;无需浪费宝贵的缓冲区空间来缓存它们及其结果。 3. 当识别出相同的消息(在那个时间窗口内接收,可能是“同时”)时,避免不必要的重复计算。框架会自动执行此操作,并且本质上,重复的消息永远不会被新的参与者接收到进行处理;它们会默默地消失,并且处理一次的结果(无论该计算在过去已经完成,还是当时正在进行)将被发送给所有适当的接收者(如果已经可用,则立即发送,如果没有,则在完成计算后发送)。请注意,即使“回复”字段不同,消息也应被视为相同,只要它们表示的语义/计算在其他所有方面都相同。另请注意,计算应该是纯函数式的,即没有副作用,因为建议的缓存优化工作并且根本不改变程序语义。

如果我的建议与 Akka 的做事方式不兼容,和/或如果您发现这是一个非常糟糕的主意的一些强有力的理由,请告诉我。

谢谢, 真棒,斯卡拉

【问题讨论】:

【参考方案1】:

我不知道是否所有这些责任都应该由 Akka 来处理。像往常一样,这完全取决于规模,尤其是定义消息唯一性的属性数量。

在缓存机制的情况下,已经提到的将请求唯一映射到参与者的方法是可行的方法,尤其是它可以得到持久性的支持。

identity的情况下,我宁愿使用基于图的算法,如 signal-collect,而不是检查简单的相等性(这可能是瓶颈)。

【讨论】:

【参考方案2】:

正如 Neil 所说,这并不是真正的框架功能,实现它甚至将其抽象为它自己的 trait 是相当简单的。

trait CachingExpensiveThings  self: Actor =>
  val cache = ...
  def receive: Actor.Receive = 
    case s: ExpensiveThing => cachedOrCache(s)
  

  def cacheOrCached(s: ExpensiveThing) = cache.get(s) match 
    case null => val result = compute(s)
                 cache.put(result)
                 self.reply_?)(result)
    case cached => self.reply_?)(cached)
  
  def compute(s: ExpensiveThing): Any 



class MyExpensiveThingCalculator extends Actor with CachingExpensiveThings 
  def compute(s: ExpensiveThing) = 
    case l: LastDigitOfPi => ...
    case ts: TravellingSalesman => ...
  

【讨论】:

我还计算了 Pi 的最后一位数字,你把它变成了什么? ;p【参考方案3】:

您要问的是不依赖于 Akka 框架,而是 它是您构建参与者和消息的方式。首先确保您的消息是不可变的,并且通过 equals/hashCode 方法具有适当定义的身份。案例类免费为您提供两者,但是如果您在消息中嵌入了 actorRefs 用于回复目的,您将不得不覆盖标识方法。案例类参数也应该具有相同的递归属性(不可变和正确的标识)。

其次,您需要弄清楚参与者将如何处理存储和识别当前/过去的计算。最简单的方法是将请求唯一地映射到参与者。这样,该参与者并且只有该参与者将处理该特定请求。给定一组固定的参与者和请求的 hashCode,这可以很容易地完成。 如果 Actor 集受到监督,则奖励积分,监督者负责管理负载平衡/映射和替换失败的 Actor(Akka 使这部分变得简单)。

最后参与者本身可以根据您描述的标准维护响应缓存行为。在 actor 的上下文中,一切都是线程安全的,因此由请求本身键入的 LRU 缓存(记住良好的身份属性)对于您想要的任何类型的行为都很容易。

【讨论】:

这有一种变体,它取决于确保最后一条消息是处理的消息,而不是队列中较早的消息。即,在确保收到所有消息之前,我不想开始昂贵的计算。我认为可以使用 FSM 修改上述方法以达到相同的结果。

以上是关于Akka 框架支持查找重复消息的主要内容,如果未能解决你的问题,请参考以下文章

akka消息传递

分析线程转储 - sun.misc.Unsafe.park 上的许多阻塞线程 [重复]

Akka(15): 持久化模式:AtLeastOnceDelivery-消息保证送达模式

Akka演员(Scala)如何在内存不足时获得堆转储[重复]

Akka框架使用注意点

我是不是需要重复使用相同的 Akka ActorSystem 或者我可以在每次需要时创建一个?