Akka scala 事件总线具有不同的分类器,具体取决于订阅者
Posted
技术标签:
【中文标题】Akka scala 事件总线具有不同的分类器,具体取决于订阅者【英文标题】:Akka scala Event bus with different classifiers depending on the subscriber 【发布时间】:2014-11-25 14:58:06 【问题描述】:我正在研究 Akka EventBus 以检查它是否可以解决我的设计问题之一,但我仍然不知道。 问题如下。
为了简化,我有:
case class Request(requesterId: String, operation: String, header: RequestHeader)
case class Response(requesterId: String, operation: String, header: ResponseHeader)
我有几个具有不同功能的演员,我希望一些演员订阅Response
,取决于requesterId
,其他一些取决于operation
。
有没有办法通过 EventBus 和分类器轻松实现这一目标?
谢谢, 乔尔
【问题讨论】:
【参考方案1】:当然,它叫LookupEventBus。您可以通过扩展它来实现自己的总线,并在 classify
方法中提取 requesterId
,如下所示:
class LookupBusImpl extends EventBus with LookupClassification
type Event = HasRequesterId // I made up a super type for you here
type Classifier = String
type Subscriber = ActorRef
override def classify(event: HasRequesterId): String = event.requesterId
然后您将订阅给定的requesterId
,如下所示:
lookupBus.subscribe(actorRef, "requester-100")
然后这个 Actor 将只接收被分类为 requester-100
的消息。
【讨论】:
谢谢,但它只回答了我问题的第一部分:我希望另一个演员能够订阅不同的分类器,即operation
字段。基本上,请求者对结果感兴趣,因为......他是请求者,其他一些人是因为他们想了解具体操作。我应该创建多条总线并两次发布事件吗?
在 Akka 中最多可以使用多少个分类器。例如,如果我有 10,000 个不同的分类器。我可以继续将“受保护的 def mapSize = 128”更改为 10000。还是?【参考方案2】:
我同意 Konrad 的观点,即您应该实施新的 LookupClassification
总线来解决您的问题。我认为拥有这些总线的两个独立实例是最简单的,一个按 requesterId 分类,另一个按操作分类。这种方法的一些基本设置工作是:
//Singleton to hold the instances of each stream type
object ResponseEventStream
val RequestorIdStream = new RequestorIdResponseEventStream
val OperationStream = new OperationResponseEventStream
//Common functionality for the two different types of streams
trait ResponseEventStream extends ActorEventBus with LookupClassification
import ResponseEventStream._
type Event = Response
type Classifier = String
protected def mapSize = 128
protected def publish(resp:Response, subscriber: ActorRef) =
if (subscriber.isTerminated) unsubscribe(subscriber)
else subscriber ! resp
//Concrete impl that uses requesterId to classify
class RequestorIdResponseEventStream extends ResponseEventStream
protected def classify(resp:Response) = resp.requesterId
//Concrete impl that uses operation to classify
class OperationResponseEventStream extends ResponseEventStream
protected def classify(resp:Response) = resp.operation
//Trait to mix into classes that need to publish or subscribe to response events
//Has helper methods to simplify interaction with the two distinct streams
trait ResponseEventing
import ResponseEventStream._
def publishResponse(resp:Response)
RequestorIdStream.publish(resp)
OperationStream.publish(resp)
def subscribeByRequestId(requestId:String, ref:ActorRef)
RequestorIdStream.subscribe(ref, requestId)
def subscribeByOperartion(op:String, ref:ActorRef)
OperationStream.subscribe(ref, op)
然后您只需要将 ResponseEventing
特征混合到需要发布 Response
事件或需要订阅它们的参与者中。发布的 Actor 将调用 publishResponse
,需要订阅的 Actor 将调用 subscribeXXX
,具体取决于他们感兴趣的分类(requesterId 或 operation)。
【讨论】:
非常感谢,我会试试这个解决方案!以上是关于Akka scala 事件总线具有不同的分类器,具体取决于订阅者的主要内容,如果未能解决你的问题,请参考以下文章