使用 Netty 和 Scala 演员的异步 http 请求
Posted
技术标签:
【中文标题】使用 Netty 和 Scala 演员的异步 http 请求【英文标题】:Asynchronous http requests using Netty and Scala actors 【发布时间】:2012-12-20 06:06:31 【问题描述】:嘿希望有人能帮我解决这个问题。
我正在尝试使用 Scala Actors 和 Netty.io 库来获取异步 http 请求。 (是的,我知道 Scala 演员正在被弃用,但这对我来说是一个学习练习)
我写了一个演员HttpRequestActor
,它接受案例类RequestPage(uri:URI)形式的消息。
当它收到消息时,它会创建必要的 Netty 对象来发出 http 请求,我基于 [HttpSnoopClient
] (http://static.netty.io/3.5/xref/) 中的大部分代码org/jboss/netty/example/http/snoop/HttpSnoopClient.html) 示例。
我创建了一个客户端并将当前的actor实例传递给我的ChannelPipelineFactory
实现,这也将actor传递给我的SimpleChannelUpstreamHandler
实现,在那里我覆盖了messageReceived
函数。
actor 实例作为侦听器传递,我使用 DefaultHttpRequest
类创建请求并写入通道以发出请求。
使用从写入通道返回的ChannelFuture
对象对actor对象进行了阻塞调用。当调用我的处理程序类的messageRecieved
函数时,我将netty http 请求的响应解析为字符串,将带有响应内容的消息发送回actor并关闭通道。
future 完成后,我的代码尝试使用收到的 http 内容响应向调用参与者发送回复。
代码有效,我能够得到回复,将其发送到我的演员实例,打印出内容并向正在使用的演员实例释放资源发送消息。
问题是当我测试它时,对actor的原始调用没有得到回复,线程只是保持打开状态。
代码示例 - HttpRequestActor
我的HttpRequestActor
课程代码
import scala.actors.Actor
import java.net.InetSocketAddress,URI
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.group.DefaultChannelGroup
import java.util.concurrent.Executors,CancellationException
import org.jboss.netty.util.CharsetUtil
import scala.concurrent. Promise, Future
import scala.concurrent.ExecutionContext.Implicits.global
/**
* @author mebinum
*
*/
class HttpRequestActor extends Actor
//initialize response with default uninitialized value
private var resp:Response = _
private val executor = Executors.newCachedThreadPool
private val executor2 = Executors.newCachedThreadPool
private val factory = new NioClientSocketChannelFactory(
executor,
executor2);
private val allChannels = new DefaultChannelGroup("httpRequester")
def act = loop
react
case RequestPage(uri) => requestUri(uri)
case Reply(msg) => setResponse(Reply(msg))
case NoReply => println("didnt get a reply");setResponse(NoReply)
case NotReadable => println("got a reply but its not readable");setResponse(NotReadable)
case ShutDown => shutDown()
private def requestUri(uri:URI) =
makeChannel(uri) map
channel =>
allChannels.add(channel)
val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString)
request.setHeader(HttpHeaders.Names.HOST, uri.getHost())
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE)
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
val writeFuture = channel.write(request).awaitUninterruptibly()
FutureReactor !? writeFuture match
case future : ChannelFuture =>
future.addListener(new ChannelFutureListener()
def operationComplete(future:ChannelFuture)
// Perform post-closure operation
println("current response is " + resp)
sendResponse("look ma I finished")
)
future.getChannel().close()
this ! ShutDown
//thread ends only if you send a reply from here
//println("this is final sender " + sender)
//reply("I am the true end")
private def makeChannel(uri:URI) =
val scheme = Some(uri.getScheme()).getOrElse("http")
val host = Some(uri.getHost()).getOrElse("localhost")
val port = Utils.getPort(uri.getPort, uri.getScheme)
// Set up the event pipeline factory.
val client = new ClientBootstrap(factory)
client.setPipelineFactory(new PipelineFactory(this))
//get the promised channel
val channel = NettyFutureBridge(client.connect(new InetSocketAddress(host, port)))
channel
private def setResponse(aResponse:Response) = resp = aResponse
private def sendResponse(msg:String) =
println("Sending the response " + msg)
reply(resp)
private def shutDown() =
println("got a shutdown message")
val groupFuture = allChannels.close().awaitUninterruptibly()
factory.releaseExternalResources()
override def exceptionHandler =
case e : CancellationException => println("The request was cancelled"); throw e
case tr: Throwable => println("An unknown exception happened " + tr.getCause()); throw tr
trait Response
case class RequestPage(url:URI)
case class Reply(content:String) extends Response
case object NoReply extends Response
case object NotReadable extends Response
case object ShutDown
object FutureReactor extends Actor
def act = //loop
react
case future: ChannelFuture =>
if (future.isCancelled)
throw new CancellationException()
if (!future.isSuccess())
future.getCause().printStackTrace()
throw future.getCause()
if(future.isSuccess() && future.isDone())
future.getChannel().getCloseFuture().awaitUninterruptibly()
reply(future)
//
this.start
class ClientHandler(listener:Actor) extends SimpleChannelUpstreamHandler
override def exceptionCaught( ctx:ChannelHandlerContext, e:ExceptionEvent)
e.getCause().printStackTrace()
e.getChannel().close();
throw e.getCause()
override def messageReceived(ctx:ChannelHandlerContext, e:MessageEvent) =
var contentString = ""
var httpResponse:Response = null.asInstanceOf[Response]
e.getMessage match
case (response: HttpResponse) if !response.isChunked =>
println("STATUS: " + response.getStatus);
println("VERSION: " + response.getProtocolVersion);
println
val content = response.getContent();
if (content.readable())
contentString = content.toString(CharsetUtil.UTF_8)
httpResponse = Reply(contentString)
//notify actor
else
httpResponse = NotReadable
case chunk: HttpChunk if !chunk.isLast =>
//get chunked content
contentString = chunk.getContent().toString(CharsetUtil.UTF_8)
httpResponse = Reply(contentString)
case _ => httpResponse = NoReply
println("sending actor my response")
listener ! httpResponse
println("closing the channel")
e.getChannel().close()
//send the close event
class PipelineFactory(listener:Actor) extends ChannelPipelineFactory
def getPipeline(): ChannelPipeline =
// Create a default pipeline implementation.
val pipeline = org.jboss.netty.channel.Channels.pipeline()
pipeline.addLast("codec", new HttpClientCodec())
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor())
// Uncomment the following line if you don't want to handle HttpChunks.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576))
pipeline.addLast("decoder", new HttpRequestDecoder())
//assign the handler
pipeline.addLast("handler", new ClientHandler(listener))
pipeline;
object NettyFutureBridge
import scala.concurrent. Promise, Future
import scala.util.Try
import java.util.concurrent.CancellationException
import org.jboss.netty.channel. Channel, ChannelFuture, ChannelFutureListener
def apply(nettyFuture: ChannelFuture): Future[Channel] =
val p = Promise[Channel]()
nettyFuture.addListener(new ChannelFutureListener
def operationComplete(future: ChannelFuture): Unit = p complete Try(
if (future.isSuccess)
println("Success")
future.getChannel
else if (future.isCancelled)
println("Was cancelled")
throw new CancellationException
else
future.getCause.printStackTrace()
throw future.getCause
)
)
p.future
测试代码
val url = "http://hiverides.com"
test("Http Request Actor can recieve and react to message")
val actor = new HttpRequestActor()
actor.start
val response = actor !? new RequestPage(new URI(url))
match
case Reply(msg) =>
println("this is the reply response in test")
assert(msg != "")
println(msg)
case NoReply => println("Got No Reply")
case NotReadable => println("Got a not Reachable")
case None => println("Got a timeout")
case s:Response => println("response string \n" + s)
case x => println("Got a value not sure what it is"); println(x);
使用的库: - 斯卡拉 2.9.2 - Netty.io 3.6.1.Final - 朱尼特 4.7 - 最新的 1.8 - 我也在使用@viktorklang NettyFutureBridge 对象gist 为返回的 Channel 对象创建一个 scala 未来
如何将来自 Netty 的响应内容发送回 actor 对象并结束线程?
任何帮助将不胜感激
【问题讨论】:
如果您还不知道,请查看Dispatch 感谢 Dylan 的链接,图书馆看起来很全面,我仍然想要一个简单的解决方案,并真正理解我做错了什么 我即将签入一些使用 Netty 和 Scala 2.10 期货的代码。它已经过测试并且可以工作,但它不使用演员。但是,也许它可以帮助解决这种情况下的问题。完成后我会通知您。 同时,我建议使用 Wireshark 或类似的工具来查看网络上发生了什么。 嘿,山姆,那太棒了。我正在考虑切换到 Akka 的想法,我认为这是 wat 2.10 的用途,请在你完成时告诉我。 【参考方案1】:我不了解 Scala,但我遇到了类似的问题。尝试指定响应的内容长度标头。
在普通的java中:
HttpRequest r = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri);
ChannelBuffer buffer = ChannelBuffers.copiedBuffer(input);
r.setHeader(HttpHeaders.Names.HOST, "host");
r.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
r.setHeader(HttpHeaders.Names.CONTENT_LENGTH, buffer.readableBytes());
r.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
r.setContent(buffer);
否则服务器不知道何时从客户端完成内容,除非客户端关闭连接。
您也可以使用分块编码,但您必须自己实现分块编码(至少我不知道 Netty 中有一个库可以做到这一点)。
【讨论】:
感谢@aaron 的回复,尝试过但没有任何作用。认为问题实际上可能与我的 Scala 演员代码有关,而不是我的 netty 实现以上是关于使用 Netty 和 Scala 演员的异步 http 请求的主要内容,如果未能解决你的问题,请参考以下文章