如何动态向 Source 添加元素?

Posted

技术标签:

【中文标题】如何动态向 Source 添加元素?【英文标题】:How to add elements to Source dynamically? 【发布时间】:2015-05-18 08:08:39 【问题描述】:

我有示例代码来生成未绑定的源并使用它:

对象主要

 def main(args : Array[String]): Unit = 

  implicit val system = ActorSystem("Sys")
  import system.dispatcher

  implicit val materializer = ActorFlowMaterializer()

  val source: Source[String] = Source(() => 
     Iterator.continually( "message:" + ThreadLocalRandom.current().nextInt(10000))
    )

  source.runForeach((item:String) =>  println(item) )
  .onComplete _ => system.shutdown() 
 

我想创建实现的类:

trait MySources 
    def addToSource(item: String)
    def getSource() : Source[String]

而且我需要在多个线程中使用它,例如:

class MyThread(mySources: MySources) extends Thread 
  override def run(): Unit = 
    for(i <- 1 to 1000000)  // here will be infinite loop
        mySources.addToSource(i.toString)
    
  
 

以及预期的完整代码:

object Main 
  def main(args : Array[String]): Unit = 
    implicit val system = ActorSystem("Sys")
    import system.dispatcher

    implicit val materializer = ActorFlowMaterializer()

    val sources = new MySourcesImplementation()

    for(i <- 1 to 100) 
      (new MyThread(sources)).start()
    

    val source = sources.getSource()

    source.runForeach((item:String) =>  println(item) )
    .onComplete _ => system.shutdown() 
  

MySources如何实现?

【问题讨论】:

【参考方案1】:

获得非有限源的一种方法是使用一种特殊的actor作为源,它混合了ActorPublisher trait。如果您创建其中一种演员,然后调用ActorPublisher.apply 进行包装,您最终会得到一个反应式流Publisher 实例,您可以使用来自Sourceapply 生成一个Source 来自它。之后,您只需要确保您的 ActorPublisher 类正确处理用于向下游发送元素的 Reactive Streams 协议,您就可以开始了。一个很简单的例子如下:

import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._

object DynamicSourceExample extends App

  implicit val system = ActorSystem("test")
  implicit val materializer = ActorFlowMaterializer()

  val actorRef = system.actorOf(Props[ActorBasedSource])
  val pub = ActorPublisher[Int](actorRef)

  Source(pub).
    map(_ * 2).
    runWith(Sink.foreach(println))

  for(i <- 1 until 20)
    actorRef ! i.toString
    Thread.sleep(1000)
  



class ActorBasedSource extends Actor with ActorPublisher[Int]
  import ActorPublisherMessage._
  var items:List[Int] = List.empty

  def receive = 
    case s:String =>
      if (totalDemand == 0) 
        items = items :+ s.toInt
      else
        onNext(s.toInt)    

    case Request(demand) =>  
      if (demand > items.size)
        items foreach (onNext)
        items = List.empty
      
      else
        val (send, keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      


    case other =>
      println(s"got other $other")
  



【讨论】:

我认为在Requestelse中,items foreach (onNext)这一行应该是send foreach (onNext) 现在使用Source.actorPublisher 有一个更安全(因为有界)的解决方案,它将实现为ActorRef,它由执行与您的自定义ActorBasedSource 执行的操作非常相似的参与者支持。例如。您的演员没有正确的生命周期,并且无法与难以正确处理的多个物化一起使用。 @jrudolph,你是对的。自从这个答案以来,发生了一些变化。我将很快添加一个显示新方式的编辑。谢谢。 谢谢,非常有用。 ActorBasedSource 看起来很通用,购买 Akka-Streams 默认不提供是不是很奇怪? Akka Streams 2 是否有更简单的解决方案,例如类似于推送事件的频道?【参考方案2】:

使用 Akka Streams 2,您可以使用 sourceQueue :How to create a Source that can receive elements later via a method call?

【讨论】:

【参考方案3】:

正如我在this answer 中提到的,SourceQueue 是要走的路,从 Akka 2.5 开始,有一个方便的方法 preMaterialize,它消除了首先创建复合源的需要。

我在other answer中举了一个例子。

【讨论】:

以上是关于如何动态向 Source 添加元素?的主要内容,如果未能解决你的问题,请参考以下文章

如何从另一种方法向反应堆热通量动态添加元素?

jquery 向DIV动态添加元素

JQuery如何向某个元素后面动态追加元素

如何将动态 TailwindCSS 类添加到 React 中的 DOM 元素

C++ 向数组添加元素

如何在react-navigation V5中添加动态抽屉元素?