Akka HTTP 源流与常规请求处理

Posted

技术标签:

【中文标题】Akka HTTP 源流与常规请求处理【英文标题】:Akka HTTP Source Streaming vs regular request handling 【发布时间】:2018-08-28 07:37:46 【问题描述】:

使用Source Streaming 与处理请求的常规方式相比有什么优势?我的理解是在这两种情况下

    TCP 连接将被重用 将在客户端和服务器之间施加背压

我可以看到源流的唯一优势是如果响应非常大并且客户端更喜欢以较小的块消耗它。

我的用例是我有一个很长的用户列表(数百万),我需要调用一个对用户执行一些过滤并返回一个子集的服务。

目前,在服务器端我公开了一个批处理 API,在客户端,我只是将用户分成 1000 个块,并使用 Akka HTTP Host API 并行进行 X 个批处理调用。

我正在考虑切换到 HTTP 流式传输,但无法完全弄清楚价值是什么

【问题讨论】:

【参考方案1】:

您还缺少另一个巨大的好处:内存效率。通过拥有流式管道、客户端/服务器/客户端,各方可以安全地处理数据,而不会冒着破坏内存分配的风险。这在服务器端特别有用,您总是必须假设客户端可能会做一些恶意的事情......

创建客户端请求

假设您的数百万用户的最终来源是一个文件。您可以从此文件创建流源:

val userFilePath : java.nio.file.Path = ???

val userFileSource = akka.stream.scaladsl.FileIO(userFilePath)

您可以使用此源创建将用户流式传输到服务的 http 请求:

import akka.http.scaladsl.model.HttpEntity.Chunked, ChunkStreamPart
import akka.http.scaladsl.model.RequestEntity, ContentTypes, HttpRequest

val httpRequest : HttpRequest = 
  HttpRequest(uri = "http://filterService.io", 
              entity = Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, userFileSource))

此请求现在会将用户流式传输到服务,而不会将整个文件消耗到内存中。一次只会缓冲数据块,因此,您可以发送可能无限的请求用户数量和您的客户都可以。

服务器请求处理

同样,您的服务器可以设计为接受具有可能无限长的实体的请求。

你的问题是说服务会过滤用户,假设我们有过滤功能:

val isValidUser : (String) => Boolean = ???

这可用于过滤传入的请求实体并创建一个响应实体来提供响应:

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.HttpEntity.Chunked

val route = extractDataBytes  userSource =>
  val responseSource : Source[ByteString, _] = 
    userSource
      .map(_.utf8String)
      .filter(isValidUser)
      .map(ByteString.apply)

  complete(HttpResponse(entity=Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, 
                                                responseSource)))

客户端响应处理

客户端可以类似地处理过滤后的用户,而无需将它们全部读入内存。例如,我们可以分派请求并将所有有效用户发送到控制台:

import akka.http.scaladsl.Http

Http()
  .singleRequest(httpRequest)
  .map  response =>
    response
      .entity
      .dataBytes
      .map(_.utf8String)
      .foreach(System.out.println)
  

【讨论】:

谢谢!这就说得通了。在我们的例子中,在客户端,我们从一个文件流式传输,将文件分成 100 个批次,然后使用在该批次上调用 singleRequest 的函数进行 mapAsyncUnordered。我认为这与您的示例具有相同的效果(我可能错了) 在服务器上,我不完全了解流式传输的好处。 (a) 客户端仍然可以通过建立太多连接来溢出 API。 (b) 我们仍然可以通过配置 akka-http 允许的连接数和进行中的请求数来控制服务器上的负载 @EugeneMi 客户端:您必须明确编写的所有批处理和 mapAsyncUnordered 都是通过使用 Chunked.fromData 免费获得的。服务器:(a)服务器是 Connection 对象的 akka-stream Source,因此背压可防止 DDoS 攻击,(b)即使您限制请求和连接的数量,如果您的逻辑将整个实体读入内存一个请求仍然可以通过发送一个巨大的实体来炸毁你的记忆。 Akka Http 一路背压:一个服务器是一个连接的来源,一个连接是一个请求的来源,一个请求是一个字节的来源。

以上是关于Akka HTTP 源流与常规请求处理的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Scala / Akka Http处理多个HTTP头

(转)Akka学习笔记

如何记录 Akka HTTP 客户端请求

Akka HTTP:将来阻塞会阻塞服务器

远程处理时 Service Fabric 上的重复请求

流媒体源流常见问题与延迟分析处理