喷洒 Akka Json 解组

Posted

技术标签:

【中文标题】喷洒 Akka Json 解组【英文标题】:Spray Akka Json Unmarshalling 【发布时间】:2016-02-24 20:32:30 【问题描述】:

我有一个关于通过使用 spray - akka 将对象解组到 Json 的问题。

当我想使用返回 Future[List[Person]] 的演员时,它不起作用。

如果我直接使用 dao 对象,它可以工作。

这是我的代码:

PersonDao.scala

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

case class Person(id: Int, name: String, surname: String)

object PersonDao 

  def getAll: Future[List[Person]] = Future 
    List[Person](Person(1, "Bilal", "Alp"), Person(2, "Ahmet", "Alp"))
  

EntityServiceActor.scala

import akka.actor.Actor
import com.bilalalp.akkakafka.model.PersonDao
import com.bilalalp.akkakafka.service.ServiceOperation.FIND_ALL

object ServiceOperation 

  case object FIND_ALL



class EntityServiceActor extends Actor 

  override def receive: Receive = 

    case FIND_ALL => PersonDao.getAll
  

ServerSupervisor.scala

import akka.actor.Actor, ActorRefFactory
import com.bilalalp.akkakafka.webservice.TaskWebService
import spray.routing.RejectionHandler.Default


class ServerSupervisor extends Actor with PersonWebService 

  implicit val system = context.system

  override def receive: Receive = runRoute(entityServiceRoutes)

  override implicit def actorRefFactory: ActorRefFactory = context

WebServiceTrait.scala

import akka.util.Timeout

import spray.routing.HttpService

import scala.concurrent.duration._
import scala.language.postfixOps

import org.json4s.NoTypeHints
import org.json4s.native.Serialization._

trait WebServiceTrait extends HttpService 

  implicit def executionContext = actorRefFactory.dispatcher

  implicit val json4sFormats = formats(NoTypeHints)

  implicit val timeout = Timeout(120 seconds)

PersonWebService.scala

trait PersonWebService extends WebServiceTrait with Json4sSupport 

  val json3sFormats = DefaultFormats

  val entityServiceWorker = actorRefFactory.actorOf(Props[EntityServiceActor], "entityServiceActor")

  val entityServiceRoutes = 
    pathPrefix("person") 
      pathEndOrSingleSlash 
        get 
          ctx => ctx.complete((entityServiceWorker ? FIND_ALL).mapTo[Person])
        
      
    
  

Application.scala

import akka.actor.ActorRef, ActorSystem, Props
import akka.io.IO
import com.bilalalp.akkakafka.server.ServerSupervisor
import spray.can.Http


object Application extends App 

  implicit val system = ActorSystem("actorSystem")

  val mainHandler: ActorRef = system.actorOf(Props[ServerSupervisor])
  IO(Http)! Http.Bind(mainHandler, interface = Configuration.appInterface, port = Configuration.appPort)


当我运行这段代码时,它什么也没有给出并等待一段时间。

等待浏览器后给出这个消息:

服务器无法及时响应您的请求。

控制台输出是

[错误] [2015 年 11 月 22 日 21:15:24.109] [actorSystem-akka.actor.default-dispatcher-7] [akka.actor.ActorSystemImpl(actorSystem)] 处理时出错 请求 HttpRequest(GET,http://localhost:3001/person/,List(Host: 本地主机:3001,连接:保持活动,缓存控制:无缓存, Pragma:无缓存,用户代理:Mozilla/5.0(Windows NT 6.3;WOW64) AppleWebKit/537.36 (Khtml, like Gecko) Maxthon/4.4.6.1000 Chrome/30.0.1599.101 Safari/537.36,DNT:1,接受编码:gzip, 放气,接受语言:tr-TR),Empty,HTTP/1.1) akka.pattern.AskTimeoutException:询问超时 [演员[akka://actorSystem/user/$a/entityServiceActor#-1810673919]] 在 [120000 毫秒] 之后。 Sender[null] 发送的消息类型 “com.bilalalp.akkakafka.service.ServiceOperation$FIND_ALL$”。 在 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:415) 在 akka.actor.Scheduler$$anon$7.run(Scheduler.scala:132) 在 scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) 在 scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 在 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)

如果我将 PersonWebService.scala 更改为此:

trait PersonWebService extends WebServiceTrait with Json4sSupport 

  val json3sFormats = DefaultFormats

  val entityServiceWorker = actorRefFactory.actorOf(Props[EntityServiceActor], "entityServiceActor")

  val entityServiceRoutes = 
    pathPrefix("person") 
      pathEndOrSingleSlash 
        get (
//                    ctx => ctx.complete((entityServiceWorker ? FIND_ALL).mapTo[Person])
          ctx => ctx.complete(PersonDao getAll)
        )
      
    
  

它的工作原理和输出是:

["id":1,"name":"Bilal","surname":"Alp","id":2,"name":"Ahmet","surname":"Alp" ]

我想在喷射路线中使用演员。我不知道这是否是一种不好的做法,因为我是 akka 和喷雾的新手。

我该如何解决这个问题?有什么想法吗?

谢谢。

【问题讨论】:

【参考方案1】:

首先,您可以输入 (PersonWebService.scala):

pathEndOrSingleSlash 
    get 
      complete 
       (entityServiceWorker ? FindAll).mapTo[List[Person]]
    
  

正如@Timothy Kim 所说,您需要使用“发件人!getAll.onComplete”发回结果

我看到 getAll 返回 Future,所以我认为最好在 EntityServiceActor.scala 中解决它:

// import the pipe pattern (see pipeTo below):
import akka.pattern.pipe
import context.dispatcher

override def receive: Receive = 
  case FindAll => 
    PersonDao.getAll()
      .recover( case err => List() /* could log error here */ )
      .pipeTo(sender()) // do this instead of onComplete, it's safer

在这个简单的例子中 getAll Future 被解析,如果一切正常,服务将获取人员列表,否则 List 将为空。

哦,另一件事 PersonWebService.scala 应该有 .mapTo[List[Person]]

【讨论】:

你说的我都做了。它给出了这个错误:java.lang.ClassCastException: Cannot cast scala.runtime.BoxedUnit to scala.collection.immutable.List at java.lang.Class.cast(Class.java:3369) 我稍微改进了代码 sn-p,以使用这里的最佳实践 - pipeTo 以及在期货失败的情况下恢复【参考方案2】:

您需要将结果发送回发件人:

case FIND_ALL =>
  PersonDao.getAll.pipeTo(sender())

【讨论】:

虽然此代码可能会回答问题,但提供有关此代码为何和/或如何回答问题的额外上下文可提高其长期价值。 @Timothy Kim 我已经添加了这个:case FIND_ALL => sender() ! PersonDao.getAll 但它没有给出任何错误,但我看不到 json 结果。它返回这个答案 。我该怎么办? .mapTo[Person] 应该是.mapTo[List[Person]],如上所述。 差不多,应该是.mapTo[Future[List[Person]]]

以上是关于喷洒 Akka Json 解组的主要内容,如果未能解决你的问题,请参考以下文章

Akka HTTP:如何将 Json 格式响应解组为域对象

如何使用 Akka HTTP 解组 json 响应删除不必要的字段

akka-http:找不到参数解组的隐式值

Akka(34): Http:Unmarshalling,from Json

Akka HTTP开发中使用JSON

如何使用 AKKA-HTTP、spray-json、oauth2 和 slick 优化 scala REST api?