ktor websocket flow api是如何工作的?

Posted

技术标签:

【中文标题】ktor websocket flow api是如何工作的?【英文标题】:How does ktor websocket flow api works? 【发布时间】:2021-01-01 11:42:43 【问题描述】:

我正在使用 ktor 通过 websockets 进行服务器端开发。

文档向我们展示了这个使用传入通道的示例:

for (frame in incoming.mapNotNull  it as? Frame.Text ) 
    // some

但是mapNotNull 被标记为已弃用,而支持Flow。我应该如何使用这个 API,可能会出现什么问题?例如,Flow 是冷流。这意味着将在每个collect 上调用生产者函数。它如何在 websocket 的上下文中工作。它会在第二次collect 呼叫时重新打开,或者旧消息将在下一次collect 之后传递一次?如何收集N 消息,然后停止收集,然后再次收集?

提前致谢:)

【问题讨论】:

【参考方案1】:

我应该如何使用这个 API,可能会出现什么问题?

我正在使用的以及我在文档某处的一个示例中看到的是在 ReceiveChannel 上调用的 consumeAsFlow() 方法。这是整个sn-p:

webSocket("/websocket")  //this: DefaultWebSocketServerSession
    incoming
        .consumeAsFlow()
        .map  receive(it) 
        .collect()

尚未发现这种方法存在重大问题。您应该注意的一件事(但也适用于非流方法)是,如果您将其放入流中,那么它将破坏 WebSocket 连接,这通常不是您想要做的事情。可能值得考虑将整个内容包装在 try-catch 中。

它会在第二次付费呼叫时重新打开,还是可能在下一次付费后发送一次旧消息?

您甚至在开始使用流中的消息之前就打开了 websocket。您可以看到在webSocket() 内部,您处于DefaultWebSocketServerSession 的上下文中。这是您的连接管理。在您的流程中,您只是在消息到达时一一接收消息(在建立连接之后)。如果连接中断,那么你就没有流量了。需要重新建立它才能处理您的消息。该建立位由Route.webSocket() 方法完成。我建议您看一下它的 Javadoc。

如果您希望在连接关闭后进行一些清理,您可以像这样添加finally 块:

webSocket("/chat") 
    try 
        incoming
            .consumeAsFlow()
            .map  receive(it, client) 
            .collect()
     finally 
        // cleanup
    

简而言之:collect 每收到一条消息就会被调用一次。如果没有连接(或已断开),则不会调用 collect

如何收集 N 条消息,然后停止收集,然后再次收集?

这有什么用例?我认为您不应该在任何流程中执行此操作。您当然可以从流中take(n) 项,但您将无法再从流中获取更多内容。

【讨论】:

以上是关于ktor websocket flow api是如何工作的?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Ktor websockets 上发送 ping

Ktor Websocket 功能与 ktor 中的 ContentNeogation 功能 (JSON / GSON)

将自定义标头设置为 websocket 请求 (ktor)

尝试在客户端接收数据时,Ktor-websocket 库不执行任何操作

Android设备上的ktor websocket安装错误

如何保持 Kotlin Ktor websocket 处于打开状态