在Redis pub / sub和Akka Streams中使用SSE的最简单方法是什么?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Redis pub / sub和Akka Streams中使用SSE的最简单方法是什么?相关的知识,希望对你有一定的参考价值。
我想为以下场景流式传输分块服务器发送事件:
订阅Redis密钥,如果密钥更改,则使用Akka Streams流式传输新值。它应该只在有新值时才流。
据我了解,我需要一个Source
。我猜这是订阅频道:
redis.subscriber.subscribe("My Channel") {
case message @ PubSubMessage.Message(channel, messageBytes) => println(
message.readAs[String]()
)
case PubSubMessage.Subscribe(channel, subscribedChannelsCount) => println(
s"Successfully subscribed to $channel"
)
}
在我的路线中,我需要从中创建一个Source
,但说实话,我不知道如何开始:
val route: Route =
path("stream") {
get {
complete {
val source: Source[ServerSentEvent, NotUsed] =
Source
.asSubscriber(??) // or fromPublisher???
.map(_ => {
??
})
.map(toServerSentEvent)
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.log("stream")
}
}
答案
一种方法是使用Source.actorRef
和BroadcastHub.sink
:
val (sseActor, sseSource) =
Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.map(toServerSentEvent) // converts a String to a ServerSentEvent
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
.run()
将物化的ActorRef
订阅到您的消息通道:发送给此actor的消息将向下游发出。如果没有下游需求,则使用指定的溢出策略将消息缓冲到一定数量(在此示例中,缓冲区大小为10)。请注意,此方法没有背压。
redis.subscriber.subscribe("My Channel") {
case message @ PubSubMessage.Message(channel, messageBytes) =>
val strMsg = message.readAs[String]
println(strMsg)
sseActor ! strMsg
case ...
}
另请注意,上面的示例使用了Source.actorRef[String]
;根据需要调整类型和示例(例如,它可能是Source.actorRef[PubSubMessage.Message]
)。
你可以在你的路径中使用物化的Source
:
path("stream") {
get {
complete(sseSource)
}
}
另一答案
另一种方法可以是创建Source作为队列,并将该元素提供给在订阅者回调中接收的队列
val queue =
Source
.queue[String](10, OverflowStrategy.dropHead) // drops the oldest element from the buffer to make space for the new element.
.map(toServerSentEvent) // converts a String to a ServerSentEvent
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.to(Sink.ignore)
.run()
在订户中
redis.subscriber.subscribe("My Channel") {
case message @ PubSubMessage.Message(channel, messageBytes) =>
val strMsg = message.readAs[String]
println(strMsg)
queue.offer(strMsg)
case ...
}
以上是关于在Redis pub / sub和Akka Streams中使用SSE的最简单方法是什么?的主要内容,如果未能解决你的问题,请参考以下文章