Akka Streams:流中的状态
Posted
技术标签:
【中文标题】Akka Streams:流中的状态【英文标题】:Akka Streams: State in a flow 【发布时间】:2016-10-20 12:26:53 【问题描述】:我想使用 Akka Streams 读取多个大文件来处理每一行。假设每个密钥都包含一个(identifier -> value)
。如果找到一个新的标识符,我想把它和它的值保存在数据库中;否则,如果在处理行流时已经找到标识符,我只想保存该值。为此,我认为我需要某种递归状态流,以保留在Map
中已经找到的标识符。我想我会在这个流程中收到一对(newLine, contextWithIdentifiers)
。
我刚刚开始研究 Akka Streams。我想我可以管理自己做无状态处理的东西,但我不知道如何保留contextWithIdentifiers
。我会很感激任何指向正确方向的指针。
【问题讨论】:
感谢您提出这个问题。这是一个如此简单的请求,但通过示例代码找到有意义的答案似乎很复杂。这是我找到的唯一一个! 【参考方案1】:也许像 statefulMapConcat
这样的东西可以帮助你:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink, Source
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))
val stateFlow = Flow[IdentValue].statefulMapConcat () =>
//state with already processed ids
var ids = Set.empty[Int]
identValue => if (ids.contains(identValue.id))
//save value to DB
println(identValue.value)
List(identValue)
else
//save both to database
println(identValue)
ids = ids + identValue.id
List(identValue)
Source(identValues)
.via(stateFlow)
.runWith(Sink.seq)
.onSuccess case identValue => println(identValue)
【讨论】:
感谢您的代码。我希望中间有更多类型,因为涉及 () => ... 工厂。你知道为什么没有.statefulMap
方法吗?
我 1.5 年前的问题现在看起来很幼稚。让我回答他们。工厂方式不会使任何事情复杂化。这只是意味着代码可能会被多次调用。琐碎的。没有“.statefulMap”,因为代码的工作是为每个传入条目(List
s)提供 0..n 个条目,显然这些条目被连接起来。要么我在 16 年度过了糟糕的一天,要么从那以后我学到了一些东西。
哇,statefulMapConcat
看起来非常多才多艺【参考方案2】:
几年后,如果您只需要 1 对 1 映射(不是 1 对 N),这是我编写的一个实现:
import akka.stream.stage.GraphStage, GraphStageLogic
import akka.stream.Attributes, FlowShape, Inlet, Outlet
object StatefulMap
def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)
class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]]
val in = Inlet[T]("StatefulMap.in")
val out = Outlet[O]("StatefulMap.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape)
val f = converter
setHandler(in, () => push(out, f(grab(in))))
setHandler(out, () => pull(in))
测试(和演示):
behavior of "StatefulMap"
class Counter extends (Any => Int)
var count = 0
override def apply(x: Any): Int =
count += 1
count
it should "not share state among substreams" in
val result = await
Source(0 until 10)
.groupBy(2, _ % 2)
.via(StatefulMap(new Counter()))
.fold(Seq.empty[Int])(_ :+ _)
.mergeSubstreams
.runWith(Sink.seq)
result.foreach(_ should be(1 to 5))
【讨论】:
以上是关于Akka Streams:流中的状态的主要内容,如果未能解决你的问题,请参考以下文章
Akka Stream Kafka vs Kafka Streams
在“Akka-Streams”中使用`extrapolate`的用例是什么?
如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用
由于 Kafka Streams 现在可用,SMACK 堆栈中是不是需要 Spark 和 Akka? [关闭]