使用 scala 演员时我应该如何处理阻塞操作?
Posted
技术标签:
【中文标题】使用 scala 演员时我应该如何处理阻塞操作?【英文标题】:How should I handle blocking operations when using scala actors? 【发布时间】:2010-07-27 17:23:58 【问题描述】:我大约在两天前开始学习 scala actor 框架。为了使我的想法具体化,我决定实现一个基于 TCP 的回显服务器,它可以处理多个同时连接。
这是回显服务器的代码(不包括错误处理):
class EchoServer extends Actor
private var connections = 0
def act()
val serverSocket = new ServerSocket(6789)
val echoServer = self
actor while (true) echoServer ! ("Connected", serverSocket.accept)
while (true)
receive
case ("Connected", connectionSocket: Socket) =>
connections += 1
(new ConnectionHandler(this, connectionSocket)).start
case "Disconnected" =>
connections -= 1
基本上,服务器是一个处理“已连接”和“已断开”消息的 Actor。它将连接委托给匿名参与者,该参与者调用 serverSocket 上的 accept() 方法(阻塞操作)。当连接到达时,它通过“已连接”消息通知服务器,并将套接字传递给它以用于与新连接的客户端通信。 ConnectionHandler 类的实例处理与客户端的实际通信。
这是连接处理程序的代码(包括一些错误处理):
class ConnectionHandler(server: EchoServer, connectionSocket: Socket)
extends Actor
def act()
for (input <- getInputStream; output <- getOutputStream)
val handler = self
actor
var continue = true
while (continue)
try
val req = input.readLine
if (req != null) handler ! ("Request", req)
else continue = false
catch
case e: IOException => continue = false
handler ! "Disconnected"
var connected = true
while (connected)
receive
case ("Request", req: String) =>
try
output.writeBytes(req + "\n")
catch
case e: IOException => connected = false
case "Disconnected" =>
connected = false
close()
server ! "Disconnected"
// code for getInputStream(), getOutputStream() and close() methods
连接处理程序使用匿名参与者,通过在套接字的输入流上调用 readLine() 方法(阻塞操作)来等待将请求发送到套接字。当收到请求时,会向处理程序发送“请求”消息,然后处理程序将请求回显给客户端。如果处理程序或匿名参与者遇到底层套接字问题,则套接字将关闭,并向回显服务器发送“断开连接”消息,指示客户端已与服务器断开连接。
所以,我可以启动 echo 服务器并让它等待连接。然后我可以打开一个新终端并通过 telnet 连接到服务器。我可以向它发送请求,它会正确响应。现在,如果我打开另一个终端并连接到服务器,服务器会注册连接,但无法为这个新连接启动连接处理程序。当我通过任何现有连接向它发送消息时,我没有立即得到响应。这是有趣的部分。当我终止除一个现有客户端连接之外的所有客户端连接并使客户端 X 保持打开状态时,将返回对我通过客户端 X 发送的请求的所有响应。我做了一些测试并得出结论,即使我在创建连接处理程序时调用 start() 方法,后续客户端连接也不会调用 act() 方法.
我想我在连接处理程序中错误地处理了阻塞操作。由于先前的连接是由一个连接处理程序处理的,该连接处理程序阻止了一个匿名参与者等待请求,我认为这个被阻止的参与者正在阻止其他参与者(连接处理程序)启动。
在使用 scala actor 时应该如何处理阻塞操作?
任何帮助将不胜感激。
【问题讨论】:
【参考方案1】:来自the scaladoc for scala.actors.Actor:
注意:在调用 Actor trait 或其伴随对象(例如
receive
)提供的线程阻塞方法以外的线程阻塞方法时必须小心。阻塞一个actor内部的底层线程可能会导致其他actor的饥饿。这也适用于在调用receive
/react
之间长时间占用线程的演员。如果参与者使用阻塞操作(例如,阻塞 I/O 的方法),有几种选择:
可以将运行时系统配置为使用更大的线程池大小(例如,通过设置actors.corePoolSize
JVM 属性)。Actor
trait 的scheduler
方法可以被覆盖以返回一个ResizableThreadPoolScheduler
,它会调整其线程池的大小以避免调用任意阻塞方法的参与者造成饥饿。actors.enableForkJoin
JVM 属性可以设置为 false,在这种情况下,默认使用ResizableThreadPoolScheduler
来执行 actor。
【讨论】:
非常感谢。我设置了actors.corePoolSize JVM 属性,现在我的回显服务器正在正确处理多个连接。接下来。重新设计它,使其每个连接不使用一个线程。如果您对此有任何想法,我真的很想听听。 如果我开始一个类似的项目,我可能会想试试 Naggati (github.com/robey/naggati/blob/master/src/main/scala/…)。 我一定会把它用于创意。 我不确定,但我认为 Akka 可能内置了对非阻塞 IO 的支持。 谢谢埃里克。 Akka 看起来是一个非常棒的项目。我一定会尝试的。以上是关于使用 scala 演员时我应该如何处理阻塞操作?的主要内容,如果未能解决你的问题,请参考以下文章