如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用
Posted
技术标签:
【中文标题】如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用【英文标题】:How to use an Akka Streams SourceQueue with PlayFramework 【发布时间】:2016-07-23 16:27:50 【问题描述】:我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中。
播放控制器需要一个 Source 才能使用 chuncked
方法流式传输结果。
由于 Play 在后台使用其自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源会在 chunked
方法使用之前被消耗(除非我使用以下 hack)。
如果我使用响应式流发布者预先实现源队列,我可以让它工作,但它是一种“肮脏的黑客”:
def sourceQueueAction = Action
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
有没有更简单的方法将 Akka Streams SourceQueue 与 PlayFramework 结合使用?
谢谢
【问题讨论】:
我非常喜欢这种方法。为什么你觉得它“脏”? 【参考方案1】:解决方案是在源上使用mapMaterializedValue
来获得其队列实现的未来:
def sourceQueueAction = Action
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map queue =>
Source.tick(0.second, 1.second, "tick")
.runForeach (t => queue.offer(t))
Ok.chunked(queueSource)
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) =
val p = Promise[M]
val s = src.mapMaterializedValue m =>
p.trySuccess(m)
m
(s, p.future)
【讨论】:
为什么如果我做queueSource.map _.toUpperCase
,例如我没有得到一个Source[String,NotUsed]?相反,这会返回错误Expression of type queueSource.Repr[String] doesn't conform to expected type Source[String,NotUsed].
您将在哪里对源的元素进行转换?就像your example中的记号一样
你能用 Java 做到这一点吗?
Source.preMaterialize 也可以用来代替peekMatValue
方法【参考方案2】:
想分享我今天得到的一个见解,虽然它可能不适合你在 Play 中的情况。
与其考虑触发Source
,不如将问题颠倒过来,并为执行采购的函数提供Sink
。
在这种情况下,Sink
将是“配方”(非实现)阶段,我们现在可以使用 Source.queue
并立即实现它。排队了得到了它运行的流程。
【讨论】:
很有趣,我很乐意看到一个例子:) @Loic 我会,但我使用它的代码目前是封闭源代码。再考虑几天(低火),我认为这与在上层制作 SourceQueue 相同,并将“提供”功能暴露给任何想要使用源的人。以上是关于如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用的主要内容,如果未能解决你的问题,请参考以下文章
使用 Akka / Akka Streams / Akka HTTP 时 Akka 版本冲突
Akka Stream Kafka vs Kafka Streams
在“Akka-Streams”中使用`extrapolate`的用例是什么?