如何在 Akka Http 中使用 HttpResponse

Posted

技术标签:

【中文标题】如何在 Akka Http 中使用 HttpResponse【英文标题】:How to consume HttpResponse in Akka Http 【发布时间】:2022-01-15 19:02:00 【问题描述】:
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods, HttpRequest, Uri
import akka.stream.scaladsl.Flow, Source
import akka.stream.ActorMaterializer, OverflowStrategy
import spray.json._

import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.Failure, Success

object SoftwareRegistry extends App with Formatter 

  implicit val system = ActorSystem("NPMRegistry")
  implicit val materializer = ActorMaterializer()

  case class NPMPackage(name: String)

  // reading the packages
  val filename = "B:\\Scala\\NPMRegistry\\src\\main\\resources\\packages.txt"
  val bufferedSource = scala.io.Source.fromFile(filename)
  val listOfPackages: List[NPMPackage] = (for (line <- bufferedSource.getLines) yield 
    NPMPackage(line.trim)
  ).toList
  bufferedSource.close()

  // requests
  val serverHttpRequests = listOfPackages.map(pkg =>
    (HttpRequest(
      HttpMethods.GET,
      uri = Uri(s"/$pkg.name")
    ),
      UUID.randomUUID().toString)
  )

  // source
  val sourceList = Source(serverHttpRequests)
  val bufferedFlow = Flow[(HttpRequest, String)]
    .buffer(10, overflowStrategy = OverflowStrategy.backpressure)
    .throttle(1, 3 seconds)

  val dd = sourceList
    .via(bufferedFlow).async
    .via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
    .runForeach 
      case (Success(response), oId) =>
        println(s"$oId $response")
      case (Failure(ex), oId) => println(ex)
    

在上面的代码中,我可以将响应打印到控制台,我想知道如何使用实体并以流的方式访问响应中的数据,而不是将来。

以下是现有代码的结果

【问题讨论】:

“流式传输方式”到底是什么意思?您想如何使用收到的 HTTP 响应? @Ava 我想在收到响应时使用它们。我之前尝试了另一种方法;它给出了未来的所有响应,我可以在未来完成后使用它们。这里我要一一回复 【参考方案1】:

您基本上需要将逻辑保留在 Akka Streams API 中,而不是像您那样使用 runForEach 终止它。

这方面的简化示例如下所示:

.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
    .flatMapConcat 
      case (Success(response), _) => Source.single(response)
      case (Failure(_), _) => Source.empty //warning, ignoring errors
    
    .map(httpResponse => httpResponse.entity)
    .flatMapConcat(e => e.getDataBytes().map(bytes => ???))
    .runWith(Sink.ignore)

而不是runforEach 我是flatMapConcatenating 来获取HttpRespnse 忽略错误和请求的上下文以简化示例。然后mapping 获取HttpEntity,然后再次flatMapConcatenating 获取代表响应正文的ByteStrings。每个HttpRequest 可能会有多个这样的形式出现,这就是我猜你所说的“流式传输方式”。

【讨论】:

以上是关于如何在 Akka Http 中使用 HttpResponse的主要内容,如果未能解决你的问题,请参考以下文章

akka http:Akka 流与演员建立休息服务

Akka HTTP:如何将 Json 格式响应解组为域对象

Akka中使用Logback日志框架

如何使用Scala / Akka Http处理多个HTTP头

如何使用 Akka HTTP 解组 json 响应删除不必要的字段

如何使用 AKKA-HTTP、spray-json、oauth2 和 slick 优化 scala REST api?