访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef

Posted

技术标签:

【中文标题】访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef【英文标题】:Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef 【发布时间】:2015-08-27 09:32:44 【问题描述】:

我正在尝试使用Source.actorRef 方法来创建akka.stream.scaladsl.Source 对象。某种形式的东西

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

我的问题是:如何将数据发送到基于 ActorRef 的 Source 对象

我认为向源发送消息是某种形式

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

但是weatherSource 没有! 运算符或tell 方法。

documentation 没有过多描述如何使用 Source.actorRef,它只是说您可以...

提前感谢您的评论和回复。

【问题讨论】:

【参考方案1】:

你需要一个Flow:

  import akka.stream.OverflowStrategy.fail
  import akka.stream.scaladsl.Source
  import akka.stream.scaladsl.Sink, Flow

  case class Weather(zip : String, temp : Double, raining : Boolean)

  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

  val sunnySource = weatherSource.filter(!_.raining)

  val ref = Flow[Weather]
    .to(Sink.ignore)
    .runWith(sunnySource)

  ref ! Weather("02139", 32.0, true)

请记住,这都是实验性的,可能会改变!

【讨论】:

在 M5 中,看起来 Source.actorRef 不存在。你知道它搬到哪里了吗? 看起来他们基本上将其更改为将Props 传递给源。更新的文档在这里doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/… 1.0-RC3 是最新版本,Source.actorRef 仍然在那里:doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/… Source(Props) 在 M5 中,现在在 RC3 中 Source.actorPublisher 是另外一回事:它用于创建由自定义 ActorPublisher 实现支持的 Source。 嗯,我在尝试获取底层 ActorRef 时遇到了类似的问题,但我需要 ref 才能创建我的 Sink。这个Flow 可以被扔掉并创建另一个Flow 吗?【参考方案2】:

@Noah 指出 akka-streams 的实验性质,他的回答可能不适用于 1.0 版本。我不得不按照this example给出的例子:

implicit val materializer = ActorMaterializer()
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
actorRef ! TweetInfo(...)
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)

【讨论】:

【参考方案3】:

ActorRef 的实例与所有“物化值”一样,只有在整个流被物化后才能访问,换句话说,当 RunnableGraph 正在运行时。

// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))

// You get ActorRef instance as a materialized value
val actorRef1: ActorRef = rg1.run()

// Or even more correct way: to materialize both ActorRef and future to completion 
// of the stream, so that we know when we are done:

// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
// (ActorRef, Future[Done]) when you run the graph
val rg2: RunnableGraph[(ActorRef, Future[Done])] =
  sunnySource.toMat(Sink.foreach(println))(Keep.both)

// You get both ActorRef and Future[Done] instances as materialized values
val (actorRef2, future) = rg2.run()

actorRef2 ! Weather("90210", 72.0, false)
actorRef2 ! Weather("02139", 32.0, true)
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
future onComplete  /* ... */ 

【讨论】:

同时完成 ActorRef 和 future - 太棒了!谢谢!

以上是关于访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef的主要内容,如果未能解决你的问题,请参考以下文章

访问由 shared_ptr 持有的类的原子成员

Kivy - 如何访问由 .kv 文件创建的类的实例?

如何创建只能由前端访问的私有api?

以编程方式访问由属性占位符创建的属性

在 ms 访问中看不到由 pyodbc 创建的表

如何在 Python 进程中访问由 C++ 进程创建的互斥锁?