Akka scala 事件总线具有不同的分类器,具体取决于订阅者

Posted

技术标签:

【中文标题】Akka scala 事件总线具有不同的分类器,具体取决于订阅者【英文标题】:Akka scala Event bus with different classifiers depending on the subscriber 【发布时间】:2014-11-25 14:58:06 【问题描述】:

我正在研究 Akka EventBus 以检查它是否可以解决我的设计问题之一,但我仍然不知道。 问题如下。

为了简化,我有:

case class Request(requesterId: String, operation: String, header:  RequestHeader)
case class Response(requesterId: String, operation: String, header: ResponseHeader)

我有几个具有不同功能的演员,我希望一些演员订阅Response,取决于requesterId,其他一些取决于operation。 有没有办法通过 EventBus 和分类器轻松实现这一目标?

谢谢, 乔尔

【问题讨论】:

【参考方案1】:

当然,它叫LookupEventBus。您可以通过扩展它来实现自己的总线,并在 classify 方法中提取 requesterId,如下所示:

class LookupBusImpl extends EventBus with LookupClassification 
  type Event = HasRequesterId // I made up a super type for you here
  type Classifier = String
  type Subscriber = ActorRef

  override def classify(event: HasRequesterId): String = event.requesterId

然后您将订阅给定的requesterId,如下所示:

  lookupBus.subscribe(actorRef, "requester-100")

然后这个 Actor 将只接收被分类为 requester-100 的消息。

【讨论】:

谢谢,但它只回答了我问题的第一部分:我希望另一个演员能够订阅不同的分类器,即operation 字段。基本上,请求者对结果感兴趣,因为......他是请求者,其他一些人是因为他们想了解具体操作。我应该创建多条总线并两次发布事件吗? 在 Akka 中最多可以使用多少个分类器。例如,如果我有 10,000 个不同的分类器。我可以继续将“受保护的 def mapSize = 128”更改为 10000。还是?【参考方案2】:

我同意 Konrad 的观点,即您应该实施新的 LookupClassification 总线来解决您的问题。我认为拥有这些总线的两个独立实例是最简单的,一个按 requesterId 分类,另一个按操作分类。这种方法的一些基本设置工作是:

//Singleton to hold the instances of each stream type
object ResponseEventStream
  val RequestorIdStream = new RequestorIdResponseEventStream
  val OperationStream = new OperationResponseEventStream


//Common functionality for the two different types of streams
trait ResponseEventStream extends ActorEventBus with LookupClassification
  import ResponseEventStream._
  type Event = Response
  type Classifier = String  
  protected def mapSize = 128
  protected def publish(resp:Response, subscriber: ActorRef) = 
    if (subscriber.isTerminated) unsubscribe(subscriber)
    else subscriber ! resp
    


//Concrete impl that uses requesterId to classify
class RequestorIdResponseEventStream extends ResponseEventStream
  protected def classify(resp:Response) = resp.requesterId 


//Concrete impl that uses operation to classify
class OperationResponseEventStream extends ResponseEventStream
  protected def classify(resp:Response) = resp.operation 


//Trait to mix into classes that need to publish or subscribe to response events
//Has helper methods to simplify interaction with the two distinct streams
trait ResponseEventing
  import ResponseEventStream._

  def publishResponse(resp:Response)
    RequestorIdStream.publish(resp)
    OperationStream.publish(resp)
  

  def subscribeByRequestId(requestId:String, ref:ActorRef)
    RequestorIdStream.subscribe(ref, requestId)
  

  def subscribeByOperartion(op:String, ref:ActorRef)
    OperationStream.subscribe(ref, op)
    

然后您只需要将 ResponseEventing 特征混合到需要发布 Response 事件或需要订阅它们的参与者中。发布的 Actor 将调用 publishResponse,需要订阅的 Actor 将调用 subscribeXXX,具体取决于他们感兴趣的分类(requesterId 或 operation)。

【讨论】:

非常感谢,我会试试这个解决方案!

以上是关于Akka scala 事件总线具有不同的分类器,具体取决于订阅者的主要内容,如果未能解决你的问题,请参考以下文章

Scala笔记整理:Actor和AKKA

Scala笔记整理:Actor和AKKA

使用 Scala/Akka 在 JVM 中进行高频交易

Scala零基础教学90-101Akka 实战-深入解析

在 Scala Akka 期货中,map 和 flatMap 有啥区别?

Scala 中的多个 Actor 实现有何不同?