Akka 将 websocket 流的东西流式传输到 Sink.seq 以异常 SubscriptionWithCancelException$StageWasCompleted 结尾

Posted

技术标签:

【中文标题】Akka 将 websocket 流的东西流式传输到 Sink.seq 以异常 SubscriptionWithCancelException$StageWasCompleted 结尾【英文标题】:Akka streams websocket stream things to a Sink.seq ends with exception SubscriptionWithCancelException$StageWasCompleted 【发布时间】:2022-01-21 11:43:19 【问题描述】:

我未能实现 Sink.seq,当需要实现时,我因此异常而失败

akka.stream.SubscriptionWithCancelException$StageWasCompleted$:

这里是github上的完整源代码:https://github.com/Christewart/bitcoin-s-core/blob/aaecc7c180e5cc36ec46d73d6b2b0b0da87ab260/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala#L51

我正在尝试将从 websocket 中推出的所有元素聚合到 Sink.seq 中。在聚合Sink.seq 中的内容之前,我必须进行一些 json 转换。


      
  val endSink: Sink[WalletNotification[_], Future[Seq[WalletNotification[_]]]] =
    Sink.seq[WalletNotification[_]]

  val sink: Sink[Message, Future[Seq[WalletNotification[_]]]] = Flow[Message]
    .map 
      case message: TextMessage.Strict =>
        //we should be able to parse the address message
        val text = message.text
        val notification: WalletNotification[_] = 
          upickle.default.read[WalletNotification[_]](text)(
            WsPicklers.walletNotificationPickler)
        
        logger.info(s"Notification=$notification")
        notification
      case msg =>
        logger.error(s"msg=$msg")
        sys.error("")
    
    .log(s"@@@ endSink @@@")
    .toMat(endSink)(Keep.right)

      val f: Flow[
        Message,
        Message,
        (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])] = 
        Flow
          .fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.both)
      

      val tuple: (
          Future[WebSocketUpgradeResponse],
          (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = 
        Http()
          .singleWebSocketRequest(req, f)
      

      val walletNotificationsF: Future[Seq[WalletNotification[_]]] =
        tuple._2._1

      val promise: Promise[Option[Message]] = tuple._2._2
      logger.info(s"Requesting new address for expectedAddrStr")
      val expectedAddressStr = ConsoleCli
        .exec(CliCommand.GetNewAddress(labelOpt = None), cliConfig)
        .get
      val expectedAddress = BitcoinAddress.fromString(expectedAddressStr)

       
      promise.success(None)
      logger.info(s"before notificationsF")

      //hangs here, as the future never gets completed, fails with an exception
      for 
        notifications <- walletNotificationsF
        _ = logger.info(s"after notificationsF")
       yield 
        //assertions in here...
      

我做错了什么?

【问题讨论】:

【参考方案1】:

要保持客户端连接打开,您需要“更多代码”,如下所示:

val sourceKickOff = Source
  .single(TextMessage("kick off msg"))
  // Keeps the connection open
  .concatMat(Source.maybe[Message])(Keep.right)

查看完整的工作示例,它使用来自服务器的消息: https://github.com/pbernet/akka_streams_tutorial/blob/b6d4c89a14bdc5d72c557d8cede59985ca8e525f/src/main/scala/akkahttp/WebsocketEcho.scala#L280

【讨论】:

这个不行,我在这里试了一下:github.com/Christewart/bitcoin-s-core/tree/… 我同意,这并不能解决问题,而且Source.maybe 就足够了。【参考方案2】:

问题出在这一行

Flow.fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.both)

应该是

Flow.fromSinkAndSourceCoupledMat(sink, Source.maybe[Message])(Keep.both)

当流终止时,物化流的Coupled部分将确保终止下游的Sink。

【讨论】:

以上是关于Akka 将 websocket 流的东西流式传输到 Sink.seq 以异常 SubscriptionWithCancelException$StageWasCompleted 结尾的主要内容,如果未能解决你的问题,请参考以下文章

如何将音频数据从 Android 流式传输到 WebSocket 服务器?

Akka(39): Http:File streaming-文件交换

使用 Node.js HTTP API 或 Websockets 流式传输数据?

随着时间的推移,使用 java websockets 实时流式传输模拟视频变得无响应

将 openCV C++ 视频流式传输到浏览器

C++ Boost asio 连接和流式传输