Akka Streams:流中的状态

Posted

技术标签:

【中文标题】Akka Streams:流中的状态【英文标题】:Akka Streams: State in a flow 【发布时间】:2016-10-20 12:26:53 【问题描述】:

我想使用 Akka Streams 读取多个大文件来处理每一行。假设每个密钥都包含一个(identifier -> value)。如果找到一个新的标识符,我想把它和它的值保存在数据库中;否则,如果在处理行流时已经找到标识符,我只想保存该值。为此,我认为我需要某种递归状态流,以保留在Map 中已经找到的标识符。我想我会在这个流程中收到一对(newLine, contextWithIdentifiers)

我刚刚开始研究 Akka Streams。我想我可以管理自己做无状态处理的东西,但我不知道如何保留contextWithIdentifiers。我会很感激任何指向正确方向的指针。

【问题讨论】:

感谢您提出这个问题。这是一个如此简单的请求,但通过示例代码找到有意义的答案似乎很复杂。这是我找到的唯一一个! 【参考方案1】:

也许像 statefulMapConcat 这样的东西可以帮助你:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink, Source
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))

val stateFlow = Flow[IdentValue].statefulMapConcat () =>
  //state with already processed ids
  var ids = Set.empty[Int]
  identValue => if (ids.contains(identValue.id)) 
    //save value to DB
    println(identValue.value)
    List(identValue)
   else 
    //save both to database
    println(identValue)
    ids = ids + identValue.id
    List(identValue)
  


Source(identValues)
  .via(stateFlow)
  .runWith(Sink.seq)
  .onSuccess  case identValue => println(identValue) 

【讨论】:

感谢您的代码。我希望中间有更多类型,因为涉及 () => ... 工厂。你知道为什么没有.statefulMap 方法吗? 我 1.5 年前的问题现在看起来很幼稚。让我回答他们。工厂方式不会使任何事情复杂化。这只是意味着代码可能会被多次调用。琐碎的。没有“.statefulMap”,因为代码的工作是为每个传入条目(Lists)提供 0..n 个条目,显然这些条目被连接起来。要么我在 16 年度过了糟糕的一天,要么从那以后我学到了一些东西。 哇,statefulMapConcat 看起来非常多才多艺【参考方案2】:

几年后,如果您只需要 1 对 1 映射(不是 1 对 N),这是我编写的一个实现:

import akka.stream.stage.GraphStage, GraphStageLogic
import akka.stream.Attributes, FlowShape, Inlet, Outlet

object StatefulMap 
  def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)


class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]] 
  val in = Inlet[T]("StatefulMap.in")
  val out = Outlet[O]("StatefulMap.out")
  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) 
    val f = converter
    setHandler(in, () => push(out, f(grab(in))))
    setHandler(out, () => pull(in))
  

测试(和演示):

  behavior of "StatefulMap"

  class Counter extends (Any => Int) 
    var count = 0

    override def apply(x: Any): Int = 
      count += 1
      count
    
  

  it should "not share state among substreams" in 
    val result = await 
      Source(0 until 10)
        .groupBy(2, _ % 2)
        .via(StatefulMap(new Counter()))
        .fold(Seq.empty[Int])(_ :+ _)
        .mergeSubstreams
        .runWith(Sink.seq)
    
    result.foreach(_ should be(1 to 5))
  

【讨论】:

以上是关于Akka Streams:流中的状态的主要内容,如果未能解决你的问题,请参考以下文章

Akka Stream Kafka vs Kafka Streams

在“Akka-Streams”中使用`extrapolate`的用例是什么?

如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用

由于 Kafka Streams 现在可用,SMACK 堆栈中是不是需要 Spark 和 Akka? [关闭]

如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?

在Redis pub / sub和Akka Streams中使用SSE的最简单方法是什么?