akka-http 分块响应连接

Posted

技术标签:

【中文标题】akka-http 分块响应连接【英文标题】:akka-http chunked response concatenation 【发布时间】:2016-01-21 14:29:18 【问题描述】:

我正在使用akka-http 向发送回分块响应的http 服务发出请求。相关代码如下所示:

val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap  response =>
    response.entity.dataBytes.runForeach  chunk =>
        println("-----")
        println(chunk.utf8String)
    

命令行中产生的输出如下所示:

-----
"data":
-----
"some text"

-----
"data":
-----
"this is a longer
-----
text"

-----
"data": "txt"

-----
...

数据的逻辑部分 - 在这种情况下,json 以行尾符号 \r\n 结尾,但问题是,json 并不总是适合单个 http 响应块,如示例中清晰可见以上。

我的问题是 - 我如何将传入的分块数据连接成完整的 json,以便生成的容器类型仍然保持 Source[Out,M1]Flow[In,Out,M2]?我愿意遵循akka-stream的思想。

更新:还值得一提的是,响应是无止境的,并且必须实时进行聚合

【问题讨论】:

【参考方案1】:

找到解决办法:

val request: HttpRequest = //build the request
request.flatMap  response =>
    response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
        .filter(_.contains("\r\n"))
        .runForeach  json =>
            println("-----")
            println(json)
        

【讨论】:

扫描功能到底是做什么的?没有关于它的文档。你能解释一下吗? @MaatDeamon 实际上有:“类似于折叠但不是终端操作,发出从零开始的当前值,然后将当前值和下一个值应用于给定函数 f,发出下一个当前值。” (doc.akka.io/api/akka-stream-and-http-experimental/1.0/…)。我理解它的方式就像一个折叠,但可以应用于连续流。没有它,这个解决方案将永远无法工作。 另外,响应句柄的分块是自动的吗?我的意思是您的回调对每个分块都有效? “被召唤”【参考方案2】:

akka stream documentation 在这个问题的食谱中有一个条目:“从 ByteString 流中解析行”。他们的解决方案非常冗长,但也可以处理单个块可以包含多行的情况。这似乎更健壮,因为块大小可以更改为大到足以处理多个 json 消息。

【讨论】:

链接更新到 Akka 2.4:doc.akka.io/docs/akka/2.4.2/scala/stream/… akka-http 也将很快包含对 JSON 的特定框架支持。在@ktoso 的示例项目中可以看到这种支持的早期预览。以下是相关 JSON 框架代码的直接链接:github.com/ktoso/scaladays-berlin-akka-streams/blob/master/src/…【参考方案3】:
response.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096))
.mapAsyncUnordered(Runtime.getRuntime.availableProcessors())  data =>
  if (response.status == OK) 
    val event: Future[Event] = Unmarshal(data).to[Event]
    event.foreach(x => log.debug("Received event: .", x))
    event.map(Right(_))
   else 
    Future.successful(data.utf8String)
      .map(Left(_))
  

唯一的要求是您知道一条记录的最大大小。如果您从小事开始,默认行为是如果记录大于限制则失败。您可以将其设置为截断而不是失败,但一段 JSON 没有意义。

【讨论】:

以上是关于akka-http 分块响应连接的主要内容,如果未能解决你的问题,请参考以下文章

使用 angularjs $http 处理分块响应

使用 HttpWebResponse 读取“分块”响应

libevent2 中的分块响应

InternetReadFile 因分块响应而失败

RESTful API - 批量操作的分块响应

禁用传输编码:在 Spring Webflux 响应中分块