访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef
Posted
技术标签:
【中文标题】访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef【英文标题】:Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef 【发布时间】:2015-08-27 09:32:44 【问题描述】:我正在尝试使用Source.actorRef 方法来创建akka.stream.scaladsl.Source 对象。某种形式的东西
import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source
case class Weather(zip : String, temp : Double, raining : Boolean)
val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)
val sunnySource = weatherSource.filter(!_.raining)
...
我的问题是:如何将数据发送到基于 ActorRef 的 Source 对象?
我认为向源发送消息是某种形式
//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)
但是weatherSource
没有!
运算符或tell
方法。
documentation 没有过多描述如何使用 Source.actorRef,它只是说您可以...
提前感谢您的评论和回复。
【问题讨论】:
【参考方案1】:你需要一个Flow
:
import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink, Flow
case class Weather(zip : String, temp : Double, raining : Boolean)
val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)
val sunnySource = weatherSource.filter(!_.raining)
val ref = Flow[Weather]
.to(Sink.ignore)
.runWith(sunnySource)
ref ! Weather("02139", 32.0, true)
请记住,这都是实验性的,可能会改变!
【讨论】:
在 M5 中,看起来 Source.actorRef 不存在。你知道它搬到哪里了吗? 看起来他们基本上将其更改为将Props
传递给源。更新的文档在这里doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/…
1.0-RC3 是最新版本,Source.actorRef
仍然在那里:doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/…
Source(Props)
在 M5 中,现在在 RC3 中 Source.actorPublisher
是另外一回事:它用于创建由自定义 ActorPublisher
实现支持的 Source。
嗯,我在尝试获取底层 ActorRef
时遇到了类似的问题,但我需要 ref 才能创建我的 Sink
。这个Flow
可以被扔掉并创建另一个Flow
吗?【参考方案2】:
@Noah 指出 akka-streams 的实验性质,他的回答可能不适用于 1.0 版本。我不得不按照this example给出的例子:
implicit val materializer = ActorMaterializer()
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
actorRef ! TweetInfo(...)
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)
【讨论】:
【参考方案3】:ActorRef
的实例与所有“物化值”一样,只有在整个流被物化后才能访问,换句话说,当 RunnableGraph 正在运行时。
// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))
// You get ActorRef instance as a materialized value
val actorRef1: ActorRef = rg1.run()
// Or even more correct way: to materialize both ActorRef and future to completion
// of the stream, so that we know when we are done:
// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
// (ActorRef, Future[Done]) when you run the graph
val rg2: RunnableGraph[(ActorRef, Future[Done])] =
sunnySource.toMat(Sink.foreach(println))(Keep.both)
// You get both ActorRef and Future[Done] instances as materialized values
val (actorRef2, future) = rg2.run()
actorRef2 ! Weather("90210", 72.0, false)
actorRef2 ! Weather("02139", 32.0, true)
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
future onComplete /* ... */
【讨论】:
同时完成 ActorRef 和 future - 太棒了!谢谢!以上是关于访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef的主要内容,如果未能解决你的问题,请参考以下文章