如何在 Akka Http 中使用 HttpResponse
Posted
技术标签:
【中文标题】如何在 Akka Http 中使用 HttpResponse【英文标题】:How to consume HttpResponse in Akka Http 【发布时间】:2022-01-15 19:02:00 【问题描述】:import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods, HttpRequest, Uri
import akka.stream.scaladsl.Flow, Source
import akka.stream.ActorMaterializer, OverflowStrategy
import spray.json._
import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.Failure, Success
object SoftwareRegistry extends App with Formatter
implicit val system = ActorSystem("NPMRegistry")
implicit val materializer = ActorMaterializer()
case class NPMPackage(name: String)
// reading the packages
val filename = "B:\\Scala\\NPMRegistry\\src\\main\\resources\\packages.txt"
val bufferedSource = scala.io.Source.fromFile(filename)
val listOfPackages: List[NPMPackage] = (for (line <- bufferedSource.getLines) yield
NPMPackage(line.trim)
).toList
bufferedSource.close()
// requests
val serverHttpRequests = listOfPackages.map(pkg =>
(HttpRequest(
HttpMethods.GET,
uri = Uri(s"/$pkg.name")
),
UUID.randomUUID().toString)
)
// source
val sourceList = Source(serverHttpRequests)
val bufferedFlow = Flow[(HttpRequest, String)]
.buffer(10, overflowStrategy = OverflowStrategy.backpressure)
.throttle(1, 3 seconds)
val dd = sourceList
.via(bufferedFlow).async
.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
.runForeach
case (Success(response), oId) =>
println(s"$oId $response")
case (Failure(ex), oId) => println(ex)
在上面的代码中,我可以将响应打印到控制台,我想知道如何使用实体并以流的方式访问响应中的数据,而不是将来。
以下是现有代码的结果
【问题讨论】:
“流式传输方式”到底是什么意思?您想如何使用收到的 HTTP 响应? @Ava 我想在收到响应时使用它们。我之前尝试了另一种方法;它给出了未来的所有响应,我可以在未来完成后使用它们。这里我要一一回复 【参考方案1】:您基本上需要将逻辑保留在 Akka Streams API 中,而不是像您那样使用 runForEach
终止它。
这方面的简化示例如下所示:
.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
.flatMapConcat
case (Success(response), _) => Source.single(response)
case (Failure(_), _) => Source.empty //warning, ignoring errors
.map(httpResponse => httpResponse.entity)
.flatMapConcat(e => e.getDataBytes().map(bytes => ???))
.runWith(Sink.ignore)
而不是runforEach
我是flatMapConcat
enating 来获取HttpRespnse
忽略错误和请求的上下文以简化示例。然后map
ping 获取HttpEntity
,然后再次flatMapConcat
enating 获取代表响应正文的ByteString
s。每个HttpRequest
可能会有多个这样的形式出现,这就是我猜你所说的“流式传输方式”。
【讨论】:
以上是关于如何在 Akka Http 中使用 HttpResponse的主要内容,如果未能解决你的问题,请参考以下文章
如何使用Scala / Akka Http处理多个HTTP头