Akka-http:连接到localhost上的websocket

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Akka-http:连接到localhost上的websocket相关的知识,希望对你有一定的参考价值。

我试图通过localhost上的websocket连接到某个服务器。当我尝试在JS中执行此操作时

ws = new WebSocket('ws://localhost:8137');

它成功了。但是,当我使用akka-http和akka-streams时,我收到“连接失败”错误。

object Transmitter 
    implicit val system: ActorSystem = ActorSystem()
    implicit val materializer: ActorMaterializer = ActorMaterializer()

    import system.dispatcher

    object Rec extends Actor 
        override def receive: Receive = 
            case TextMessage.Strict(msg) =>
                Log.info("Recevied signal " + msg)
        
    

    //  val host = "ws://echo.websocket.org"
    val host = "ws://localhost:8137"

    val sink: Sink[Message, NotUsed] = Sink.actorRef[Message](system.actorOf(Props(Rec)), PoisonPill)


    val source: Source[Message, NotUsed] = Source(List("test1", "test2") map (TextMessage(_)))


    val flow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
        Http().webSocketClientFlow(WebSocketRequest(host))

    val (upgradeResponse, closed) =
        source
        .viaMat(flow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(sink)(Keep.both) // also keep the Future[Done]
        .run()

    val connected: Future[Done.type] = upgradeResponse.flatMap  upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) 
            Future.successful(Done)
         else 
            Future.failed(new Exception(s"Connection failed: $upgrade.response.status")
        
    

    def test(): Unit = 
        connected.onComplete(Log.info)
    

它与ws://echo.websocket.org完全正常。

我认为附加我的服务器的代码是没有理由的,因为它适用于javascript客户端,问题只与连接有关,但是如果你想查看它我可能会显示它。

我究竟做错了什么?

答案

我用akka documentation的websocket服务器测试了你的客户端实现,我没有得到任何连接错误。您的websocket客户端连接成功。这就是为什么我猜测问题在于您的服务器实现。

object WebSocketServer extends App 
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  import Directives._

  val greeterWebSocketService = Flow[Message].collect 
    case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream)
  

  val route =
    get 
      handleWebSocketMessages(greeterWebSocketService)
    

  val bindingFuture = Http().bindAndHandle(route, "localhost", 8137)

  println(s"Server online at http://localhost:8137/\nPress RETURN to stop...")
  StdIn.readLine()

  import system.dispatcher // for the future transformations
  bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate()) // and shutdown when done

顺便说一句,我注意到你的演员的接收方法没有涵盖所有可能的消息。根据that akka issue的说法,每条信息,甚至是非常小的信息都可能最终成为Streamed。如果要打印所有文本消息,演员的更好实现将是:

object Rec extends Actor 
  override def receive: Receive = 
    case TextMessage.Strict(text)             ⇒ println(s"Received signal $text")
    case TextMessage.Streamed(textStream)     ⇒ textStream.runFold("")(_ + _).foreach(msg => println(s"Received streamed signal: $msg"))
  

请在my github找到一个工作项目。

另一答案

我找到了解决方案:我使用的服务器在IPv6上运行(如:: 1),但akka-http将localhost视为127.0.0.1并忽略:: 1。我不得不重写服务器以强制它使用IPv4,它工作。

以上是关于Akka-http:连接到localhost上的websocket的主要内容,如果未能解决你的问题,请参考以下文章

Telnet:Windows 7 上的 localhost 将无法连接到 localhost

无法连接到我的Localhost上的WCF服务

#2003 - 无法连接到“localhost”上的 MySQL 服务器 (10061)

安装后无法连接到“localhost”(10061)上的 MySQL 服务器

无法连接到运行 IIS 的服务器上的 Localhost

OperationalError:(2003,“无法连接到'localhost'上的MySQL服务器([Errno 111]连接被拒绝)”)[关闭]