如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用

Posted

技术标签:

【中文标题】如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用【英文标题】:How to use an Akka Streams SourceQueue with PlayFramework 【发布时间】:2016-07-23 16:27:50 【问题描述】:

我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中。 播放控制器需要一个 Source 才能使用 chuncked 方法流式传输结果。 由于 Play 在后台使用其自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源会在 chunked 方法使用之前被消耗(除非我使用以下 hack)。

如果我使用响应式流发布者预先实现源队列,我可以让它工作,但它是一种“肮脏的黑客”:

def sourceQueueAction = Action

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  

有没有更简单的方法将 Akka Streams SourceQueue 与 PlayFramework 结合使用?

谢谢

【问题讨论】:

我非常喜欢这种方法。为什么你觉得它“脏”? 【参考方案1】:

解决方案是在源上使用mapMaterializedValue 来获得其队列实现的未来:

def sourceQueueAction = Action 
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map  queue =>
      Source.tick(0.second, 1.second, "tick")
        .runForeach (t => queue.offer(t))
    
    Ok.chunked(queueSource)

  

  //T is the source type, here String
  //M is the materialization type, here a SourceQueue[String]
  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = 
    val p = Promise[M]
    val s = src.mapMaterializedValue  m =>
      p.trySuccess(m)
      m
    
    (s, p.future)
  

【讨论】:

为什么如果我做queueSource.map _.toUpperCase ,例如我没有得到一个Source[String,NotUsed]?相反,这会返回错误Expression of type queueSource.Repr[String] doesn't conform to expected type Source[String,NotUsed]. 您将在哪里对源的元素进行转换?就像your example中的记号一样 你能用 Java 做到这一点吗? Source.preMaterialize 也可以用来代替peekMatValue 方法【参考方案2】:

想分享我今天得到的一个见解,虽然它可能不适合你在 Play 中的情况。

与其考虑触发Source,不如将​​问题颠倒过来,并为执行采购的函数提供Sink

在这种情况下,Sink 将是“配方”(非实现)阶段,我们现在可以使用 Source.queue 并立即实现它。排队了得到了它运行的流程。

【讨论】:

很有趣,我很乐意看到一个例子:) @Loic 我会,但我使用它的代码目前是封闭源代码。再考虑几天(低火),我认为这与在上层制作 SourceQueue 相同,并将“提供”功能暴露给任何想要使用源的人。

以上是关于如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

使用 Akka / Akka Streams / Akka HTTP 时 Akka 版本冲突

Akka Stream Kafka vs Kafka Streams

在“Akka-Streams”中使用`extrapolate`的用例是什么?

Akka Streams:流中的状态

由于 Kafka Streams 现在可用,SMACK 堆栈中是不是需要 Spark 和 Akka? [关闭]

在Redis pub / sub和Akka Streams中使用SSE的最简单方法是什么?