akka 流将 akka-htpp Web 请求调用集成到流中

Posted

技术标签:

【中文标题】akka 流将 akka-htpp Web 请求调用集成到流中【英文标题】:akka stream integrating akka-htpp web request call into stream 【发布时间】:2018-12-16 20:19:25 【问题描述】:

开始使用 Akka Streams 我想执行一个简单的计算。通过调用 restful web api 扩展基本 QuickStart https://doc.akka.io/docs/akka/2.5/stream/stream-quickstart.html

val source: Source[Int, NotUsed] = Source(1 to 100)
source.runForeach(println)

已经可以很好地打印数字了。但是当尝试根据https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-integrations.html创建一个Actor来执行HTTP请求时(这真的有必要吗?)

  import akka.pattern.ask
  implicit val askTimeout = Timeout(5.seconds)
  val words: Source[String, NotUsed] =
    Source(List("hello", "hi"))

  words
    .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
    // continue processing of the replies from the actor
    .map(_.toLowerCase)
    .runWith(Sink.ignore)

由于未定义 ? 运算符,我无法编译它。据我所知,这个只能在演员内部定义。 我也不明白在mapAsync 内部的哪个位置需要调用我的自定义actor。

编辑

https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ 至少包含部分示例。 看起来创建演员不是强制性的,即

implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()


val source = Source(List("232::03::14062::19965186", "232::03::14062::19965189"))
    .map(cellKey => 
      val splits = cellKey.split("::")
      val mcc = splits(0)
      val mnc = splits(1)
      val lac = splits(2)
      val ci = splits(3)
      CellKeySource(cellKey, mcc, mnc, lac, ci)
    )
    .limit(2)
    .mapAsyncUnordered(2)(ck => getResponse(ck.cellKey, ck.mobileCountryCode, ck.mobileNetworkCode, ck.locationArea, ck.cellKey)("<<myToken>>"))

  def getResponse(cellKey: String, mobileCountryCode:String, mobileNetworkCode:String, locationArea:String, cellId:String)(token:String): Future[String] = 
    RestartSource.withBackoff(
      minBackoff = 10.milliseconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2,
      maxRestarts = 2
    )  () =>
      val responseFuture: Future[HttpResponse] =
        Http().singleRequest(HttpRequest(uri = s"https://www.googleapis.com/geolocation/v1/geolocate?key=$token", entity = ByteString(
          // TODO use proper JSON objects
          s"""
             |
             |  "cellTowers": [
             |    "mobileCountryCode": $mobileCountryCode,
             |    "mobileNetworkCode": $mobileNetworkCode,
             |    "locationAreaCode": $locationArea,
             |    "cellId": $cellId,
             |  ]
             |
          """.stripMargin)))

      Source.fromFuture(responseFuture)
        .mapAsync(parallelism = 1) 
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            Unmarshal(entity).to[String]
          case HttpResponse(statusCode, _, _, _) =>
            throw WebRequestException(statusCode.toString() )
        
    
      .runWith(Sink.head)
      .recover 
        case _ => throw StreamFailedAfterMaxRetriesException()
      
  

val done: Future[Done] = source.runForeach(println)
done.onComplete(_ ⇒ system.terminate())

已经是该问题的(部分)答案,即如何集成 Akka-streams + akka-http。但是,它不起作用,即只抛出错误 400s 并且永远不会终止。

【问题讨论】:

【参考方案1】:

    我想你已经找到了api 如何调用akka-http 客户端

    关于您的第一个代码 sn-p 不起作用。我认为对示例本身发生了一些误解。您希望示例中的代码在复制后能够正常工作。但是该文档的意图只是演示一个示例/概念,您如何将一些长时间运行的任务委托给流流程,然后在它准备好时使用结果。因为这是使用ask 调用akka actor,因为调用ask 方法返回Future。可能文档的作者只是省略了演员的定义。你可以试试这个例子:

    import java.lang.System.exit
    
    import akka.NotUsed
    import akka.actor.Actor, ActorRef, ActorSystem, Props
    import akka.pattern.ask
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Sink, Source
    import akka.util.Timeout
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.language.higherKinds
    
    object App extends scala.App 
    
      implicit val sys: ActorSystem = ActorSystem()
      implicit val mat: ActorMaterializer = ActorMaterializer()
    
      val ref: ActorRef = sys.actorOf(Props[Translator])
    
      implicit val askTimeout: Timeout = Timeout(5.seconds)
      val words: Source[String, NotUsed] = Source(List("hello", "hi"))
    
      words
        .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
        .map(_.toLowerCase)
        .runWith(Sink.foreach(println))
        .onComplete(t => 
          println(s"finished: $t")
          exit(1)
        )
    
    
    class Translator extends Actor 
    
      override def receive: Receive = 
        case msg => sender() ! s"$msg!"
      
    
    

【讨论】:

【参考方案2】:

您必须从 akka 导入询问模式。

导入 akka.pattern.ask

编辑:好的,对不起,我可以看到你已经导入了。您的代码中的 ref 是什么?演员参考?

【讨论】:

可能这不是正确的事情 - 否则它会起作用。我刚刚从doc.akka.io/docs/akka/2.5.5/scala/stream/… 获取代码并将其复制/粘贴到主类。 请查看最新编辑。我相信我已经弄清楚了第一部分。

以上是关于akka 流将 akka-htpp Web 请求调用集成到流中的主要内容,如果未能解决你的问题,请参考以下文章

masstransit请求/响应:在消费者中获取调用者超时

如何让 Shiro 在 Scala + Akka + Spray 环境中工作

akka初识Akka 简单介绍

Akka,发送Udp失败,“无法分配请求的地址”

(转)Akka学习笔记

Akka 框架支持查找重复消息