ktor websocket flow api是如何工作的?
Posted
技术标签:
【中文标题】ktor websocket flow api是如何工作的?【英文标题】:How does ktor websocket flow api works? 【发布时间】:2021-01-01 11:42:43 【问题描述】:我正在使用 ktor 通过 websockets 进行服务器端开发。
文档向我们展示了这个使用传入通道的示例:
for (frame in incoming.mapNotNull it as? Frame.Text )
// some
但是mapNotNull
被标记为已弃用,而支持Flow
。我应该如何使用这个 API,可能会出现什么问题?例如,Flow
是冷流。这意味着将在每个collect
上调用生产者函数。它如何在 websocket 的上下文中工作。它会在第二次collect
呼叫时重新打开,或者旧消息将在下一次collect
之后传递一次?如何收集N
消息,然后停止收集,然后再次收集?
提前致谢:)
【问题讨论】:
【参考方案1】:我应该如何使用这个 API,可能会出现什么问题?
我正在使用的以及我在文档某处的一个示例中看到的是在 ReceiveChannel
上调用的 consumeAsFlow()
方法。这是整个sn-p:
webSocket("/websocket") //this: DefaultWebSocketServerSession
incoming
.consumeAsFlow()
.map receive(it)
.collect()
尚未发现这种方法存在重大问题。您应该注意的一件事(但也适用于非流方法)是,如果您将其放入流中,那么它将破坏 WebSocket 连接,这通常不是您想要做的事情。可能值得考虑将整个内容包装在 try-catch
中。
它会在第二次付费呼叫时重新打开,还是可能在下一次付费后发送一次旧消息?
您甚至在开始使用流中的消息之前就打开了 websocket。您可以看到在webSocket()
内部,您处于DefaultWebSocketServerSession
的上下文中。这是您的连接管理。在您的流程中,您只是在消息到达时一一接收消息(在建立连接之后)。如果连接中断,那么你就没有流量了。需要重新建立它才能处理您的消息。该建立位由Route.webSocket()
方法完成。我建议您看一下它的 Javadoc。
如果您希望在连接关闭后进行一些清理,您可以像这样添加finally
块:
webSocket("/chat")
try
incoming
.consumeAsFlow()
.map receive(it, client)
.collect()
finally
// cleanup
简而言之:collect
每收到一条消息就会被调用一次。如果没有连接(或已断开),则不会调用 collect
。
如何收集 N 条消息,然后停止收集,然后再次收集?
这有什么用例?我认为您不应该在任何流程中执行此操作。您当然可以从流中take(n)
项,但您将无法再从流中获取更多内容。
【讨论】:
以上是关于ktor websocket flow api是如何工作的?的主要内容,如果未能解决你的问题,请参考以下文章
Ktor Websocket 功能与 ktor 中的 ContentNeogation 功能 (JSON / GSON)