如何从递归生成值的流中创建 akka-stream 源?

Posted

技术标签:

【中文标题】如何从递归生成值的流中创建 akka-stream 源?【英文标题】:How to create an akka-stream Source from a Flow that generate values recursively? 【发布时间】:2019-01-05 20:11:58 【问题描述】:

我需要遍历一个形状像树的 API。例如,目录结构或讨论线程。它可以通过以下流程建模:

type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])

def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString 

// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
  if (id == 0) (1 to 9).toList
  else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
  else Nil

val itemFlow: Flow[ItemId, Item, NotUsed] = 
  Flow.fromFunction(id => Item(randomData, nested(id)))

如何遍历这些数据?我得到了以下工作:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._

import scala.concurrent.Await
import scala.concurrent.duration.Duration

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val loop = 
  GraphDSL.create()  implicit b =>
    import GraphDSL.Implicits._

    val source = b.add(Flow[Int])
    val merge  = b.add(Merge[Int](2))
    val fetch  = b.add(itemFlow) 
    val bcast  = b.add(Broadcast[Item](2))

    val kids   = b.add(Flow[Item].mapConcat(_.kids))
    val data   = b.add(Flow[Item].map(_.data))

    val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)

    source ~> merge ~> fetch           ~> bcast ~> data
              merge <~ buffer <~ kids  <~ bcast

    FlowShape(source.in, data.out)
  

val flow = Flow.fromGraph(loop)


Await.result(
  Source.single(0).via(flow).runWith(Sink.foreach(println)),
  Duration.Inf
)

system.terminate()

但是,由于我使用的是带缓冲区的流,因此流永远不会完成。

在上游完成且缓冲元素已耗尽时完成

Flow.buffer

我多次阅读Graph cycles, liveness, and deadlocks 部分,但仍在努力寻找答案。

这将创建一个活锁:

import java.util.concurrent.atomic.AtomicInteger

def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = 
  // keep track of how many element flows, 
  val remaining = new AtomicInteger(1) // 1 = seed

  // should be > max loop(x)
  val bufferSize = 10000

  val (ref, publisher) =
    Source.actorRef[S](bufferSize, OverflowStrategy.fail)
      .toMat(Sink.asPublisher(true))(Keep.both)
      .run()

  ref ! seed

  Source.fromPublisher(publisher)
    .via(flow)
    .mapx =>
      loop(x).foreach c =>
        remaining.incrementAndGet()
        ref ! c
      
      x
    
    .takeWhile(_ => remaining.decrementAndGet > 0)

编辑:我添加了一个 git repo 来测试您的解决方案https://github.com/MasseGuillaume/source-unfold

【问题讨论】:

以下问题的答案可能会有所帮助,尤其是“无流循环”部分:***.com/questions/32459329/… 这不行,itemFlow来自HTTP调用,我无权访问nested函数。 示例代码中似乎有一些不正确的项目:itemFlow 未在任何地方使用,commentFlow 未定义。此外,itemFlow 似乎是 Flow[ItemId, Item,...] 类型,但被声明为 Flow[ItemId, Data, ...] @RamonJRomeroyVigil 确实,已编辑。 【参考方案1】:

未完成的原因

我不认为流永远不会完成的原因是“使用带有缓冲区的流”。与this question 类似的实际原因是,与默认参数eagerClose=False 合并正在等待sourcebuffer 在它(合并)完成之前完成。但是缓冲区正在等待合并完成。所以合并正在等待缓冲区,缓冲区正在等待合并。

eagerClose 合并

您可以在创建合并时设置eagerClose=True。但不幸的是,使用急切关闭可能会导致某些子 ItemId 值永远不会被查询。

间接解决方案

如果您为树的每个级别实现一个新流,则可以将递归提取到流之外。

您可以使用itemFlow 构造一个查询函数:

val itemQuery : Iterable[ItemId] => Future[Seq[Data]] = 
  (itemIds) => Source.apply(itemIds)
                     .via(itemFlow)
                     .runWith(Sink.seq[Data])

这个查询函数现在可以包装在递归辅助函数中:

val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] = 
  (itemIds, currentData) => itemQuery(itemIds) flatMap  allNewData =>
      val allNewKids = allNewData.flatMap(_.kids).toSet

      if(allNewKids.isEmpty)
        Future.successful(currentData ++ allNewData)
      else
        recQuery(allNewKids, currentData ++ data)
  

创建的流的数量将等于树的最大深度。

不幸的是,由于涉及到 Futures,这个递归函数不是尾递归的,如果树太深,可能会导致“堆栈溢出”。

【讨论】:

itemQuery 函数无法编译。我想你想要Source.apply(itemIds).via(itemFlow).toMat(Sink.seq[Data])(Keep.right).run()。或者你可以改用runWith @JeffreyChung 所以更新了,谢谢指正。【参考方案2】:

我通过编写自己的 GraphStage 解决了这个问题。

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.GraphStage, GraphStageLogic, OutHandler

import scala.concurrent.ExecutionContext

import scala.collection.mutable
import scala.util.Success, Failure, Try

import scala.collection.mutable

def unfoldTree[S, E](seeds: List[S], 
                     flow: Flow[S, E, NotUsed],
                     loop: E => List[S],
                     bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = 
  Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))


object UnfoldSource 
  implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal 
    def dequeueN(n: Int): List[A] = 
      val b = List.newBuilder[A]
      var i = 0
      while (i < n) 
        val e = self.dequeue
        b += e
        i += 1
      
      b.result()
    
  


class UnfoldSource[S, E](seeds: List[S],
                         flow: Flow[S, E, NotUsed],
                         loop: E => List[S],
                         bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] 

  val out: Outlet[E] = Outlet("UnfoldSource.out")
  override val shape: SourceShape[E] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler   
    // Nodes to expand
    val frontier = mutable.Queue[S]()
    frontier ++= seeds

    // Nodes expanded
    val buffer = mutable.Queue[E]()

    // Using the flow to fetch more data
    var inFlight = false

    // Sink pulled but the buffer was empty
    var downstreamWaiting = false

    def isBufferFull() = buffer.size >= bufferSize

    def fillBuffer(): Unit = 
      val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
      val batch = frontier.dequeueN(batchSize)
      inFlight = true

      val toProcess =
        Source(batch)
          .via(flow)
          .runWith(Sink.seq)(materializer)

      val callback = getAsyncCallback[Try[Seq[E]]]
        case Failure(ex) => 
          fail(out, ex)
        
        case Success(es) => 
          val got = es.size
          inFlight = false
          es.foreach e =>
            buffer += e
            frontier ++= loop(e)
          
          if (downstreamWaiting && buffer.nonEmpty) 
            val e = buffer.dequeue
            downstreamWaiting = false
            sendOne(e)
           else 
            checkCompletion()
          
          ()
        
      

      toProcess.onComplete(callback.invoke)
    
    override def preStart(): Unit = 
      checkCompletion()
    

    def checkCompletion(): Unit = 
      if (!inFlight && buffer.isEmpty && frontier.isEmpty) 
        completeStage()
      
     

    def sendOne(e: E): Unit = 
      push(out, e)
      checkCompletion()
    

    def onPull(): Unit = 
      if (buffer.nonEmpty) 
        sendOne(buffer.dequeue)
       else 
        downstreamWaiting = true
      

      if (!isBufferFull && frontier.nonEmpty) 
        fillBuffer()
      
    

    setHandler(out, this)
  

【讨论】:

【参考方案3】:

啊,Akka 溪流中循环的乐趣。我有一个非常相似的问题,我以一种非常老套的方式解决了这个问题。可能对你有帮助。

黑客解决方案:

  // add a graph stage that will complete successfully if it sees no element within 5 seconds
  val timedStopper = b.add(
    Flow[Item]
      .idleTimeout(5.seconds)
      .recoverWithRetries(1, 
        case _: TimeoutException => Source.empty[Item]
      ))

  source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
  merge <~ buffer <~ kids <~ bcast

这样做是在最后一个元素通过timedStopper 阶段后 5 秒,该阶段成功完成流。这是通过使用idleTimeout 实现的,它使用TimeoutException 使流失败,然后使用recoverWithRetries 将该失败转变为成功完成。 (我确实提到它很hacky)。

如果元素之间的时间可能超过 5 秒,或者如果在流“实际”完成和 Akka 接听它之间的长时间等待,这显然是不合适的。值得庆幸的是,我们都没有担心,在这种情况下,它实际上工作得很好!

非hacky解决方案

不幸的是,我能想到的唯一方法是在不通过超时作弊的情况下做到这一点非常非常复杂。

基本上,您需要能够跟踪两件事:

是否有任何元素仍在缓冲区中,或正在发送到缓冲区中 传入源是否打开

当且仅当这两个问题的答案都是时才完成直播。本机 Akka 构建块可能无法处理此问题。但是,自定义图形阶段可能会。一种选择可能是编写一个代替Merge 并给它一些了解缓冲区内容的方法,或者可能让它跟踪它接收到的ID 和广播发送到缓冲区的ID。问题在于,自定义图形阶段在最好的情况下编写起来并不是特别愉快,更不用说在像这样跨阶段混合逻辑时了。

警告

Akka 流不能很好地处理循环,尤其是它们如何计算完成度。因此,这可能不是您遇到的唯一问题。

例如,我们在非常相似的结构中遇到的一个问题是,源中的失败被视为流成功完成,而成功的Future 被具体化。问题在于,默认情况下,失败的阶段将使其下游失败,但 cancel 其上游(这被视为这些阶段的成功完成)。使用像你这样的循环,结果是一场比赛,因为取消沿着一个分支传播,但失败沿着另一个分支传播。您还需要检查如果接收器出错会发生什么;根据广播的取消设置,取消可能不会向上传播,并且源会很高兴地继续拉入元素。

最后一个选择:完全避免使用流处理递归逻辑。在一个极端情况下,如果有任何方法可以让您编写一个单一的尾递归方法,一次提取所有嵌套项并将其放入 Flow 阶段,这将解决您的问题。另一方面,我们正在认真考虑去卡夫卡排队等待我们自己的系统。

【讨论】:

根据 akka 流的 API,一个相对简单的解决方案可能类似于 val checkComplete = b.add(Flow[Item].map(i =&gt; if (source.isComplete &amp;&amp; i.kids.isEmpty) throw CompleteStage else i).recoverWithRetries(1, case CompleteStage =&gt; Source.empty[Item] ))。在kids 阶段之前添加它。问题是 source.isComplete 在 akka 流中是否可能...... 我认为自定义图形阶段是一种方法。 我真的很喜欢你的“Hacky Solution”

以上是关于如何从递归生成值的流中创建 akka-stream 源?的主要内容,如果未能解决你的问题,请参考以下文章

如何从一系列字典中创建键和值的数据框[重复]

如何从包含逗号分隔条目的变量中创建(不同的)值列表?

如何从用户在 django 基本模板中创建的所有帖子中获取模型字段值的总和?

如何在 HSQL DB 中创建具有最大行值的序列?

在 C# 的代码隐藏中创建具有值的表脚本

如何使用图像视图从图像数组中创建随机图像生成器?