Haskell - 基于 Actor 的可变性
Posted
技术标签:
【中文标题】Haskell - 基于 Actor 的可变性【英文标题】:Haskell - Actor based mutability 【发布时间】:2013-07-17 06:32:27 【问题描述】:我正在开发一个haskell 网络应用程序,我使用actor 模式来管理多线程。我遇到的一件事是如何存储例如一组客户端套接字/句柄。当然,所有线程都必须可以访问,并且可以在客户端登录/注销时更改。
由于我来自命令式世界,我想到了某种锁机制,但当我注意到这是多么丑陋时,我想到了“纯粹的”可变性,实际上它有点纯粹:
import Control.Concurrent
import Control.Monad
import Network
import System.IO
import Data.List
import Data.Maybe
import System.Environment
import Control.Exception
newStorage :: (Eq a, Show a) => IO (Chan (String, Maybe (Chan [a]), Maybe a))
newStorage = do
q <- newChan
forkIO $ storage [] q
return q
newHandleStorage :: IO (Chan (String, Maybe (Chan [Handle]), Maybe Handle))
newHandleStorage = newStorage
storage :: (Eq a, Show a) => [a] -> Chan (String, Maybe (Chan [a]), Maybe a) -> IO ()
storage s q = do
let loop = (`storage` q)
(req, reply, d) <- readChan q
print ("processing " ++ show(d))
case req of
"add" -> loop ((fromJust d) : s)
"remove" -> loop (delete (fromJust d) s)
"get" -> do
writeChan (fromJust reply) s
loop s
store s d = writeChan s ("add", Nothing, Just d)
unstore s d = writeChan s ("remove", Nothing, Just d)
request s = do
chan <- newChan
writeChan s ("get", Just chan, Nothing)
readChan chan
关键是一个线程(actor)正在管理一个项目列表并根据传入的请求修改列表。由于线程真的很便宜,我认为这可能是一个非常好的功能替代品。
当然,这只是一个原型(一个快速的概念证明)。 所以我的问题是:
-
这是管理共享可变变量的“好”方式(在演员世界中)吗?
是否已经有此模式的库? (我已经搜索过了,但什么也没找到)
问候, 克里斯
【问题讨论】:
如果你愿意探索actor模型的替代方案,我建议你试试Haskell的Software Transactional Memory。这是一种类似于数据库事务的漂亮机制。请参阅 The Real World Haskell 中的 Chapter 28。 技术上是一个不错的选择,但我听说使用 STM 具有大量线程(每个客户端一个线程,这是 Haskell 中的标准)和相对较长的操作(从列表中删除一个项目是 O(n ),当然哈希集/映射在这里可以提供帮助)可能会大量降低 STM 的性能。当然 MVar 通道可以被 STM 通道取代,这意味着使用两种技术中最好的。编辑:在这种情况下,演员模式通常非常好,因为删除/添加项目是 O(1)(只是发送一条消息)实际工作是在一个线程中完成的...... 你是对的。使用 STM 时,可能会多次重新启动事务,从而导致性能下降。但是,如果您的同步操作需要很长时间,您也可能会遇到与 Actor 类似的问题 - 如果消息数量超出其处理能力,则其状态将落后于现实。因此,使用基于平衡树 (Map
/Set
) 或 ST/IO
的哈希集肯定会有所帮助。
也许一个有趣而有趣的解决方案是创建一个平衡树,其中节点作为 STM 变量(或类似的哈希集)处理。这样,不同的线程就有可能同时更新树的不同部分。
绝对是一个很好的解决方案。我只是喜欢演员的概念,但这个解决方案可能会更好,也许对每个并发都使用演员有点矫枉过正。但是演员很容易使用,这也很有吸引力。
【参考方案1】:
这是一个使用stm
和pipes-network
的快速而肮脏的示例。这将设置一个简单的服务器,允许客户端连接并增加或减少计数器。它将显示一个非常简单的状态栏,显示所有已连接客户端的当前计数,并在断开连接时从状态栏中删除客户端计数。
首先我将从服务器开始,我慷慨地注释了代码以解释它是如何工作的:
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TVar
import qualified Data.HashMap.Strict as H
import Data.Foldable (forM_)
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad (unless)
import Control.Monad.Trans.State.Strict
import qualified Data.ByteString.Char8 as B
import Control.Proxy
import Control.Proxy.TCP
import System.IO
main = do
hSetBuffering stdout NoBuffering
- These are the internal data structures. They should be an implementation
detail and you should never expose these references to the
"business logic" part of the application. -
-- I use nRef to keep track of creating fresh Ints (which identify users)
nRef <- newTVarIO 0 :: IO (TVar Int)
- hMap associates every user (i.e. Int) with a counter
Notice how I've "striped" the hash map by storing STM references to the
values instead of storing the values directly. This means that I only
actually write the hashmap when adding or removing users, which reduces
contention for the hash map.
Since each user gets their own unique STM reference for their counter,
modifying counters does not cause contention with other counters or
contention with the hash map. -
hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int)))
- The following code makes heavy use of Haskell's pure closures. Each
'let' binding closes over its current environment, which is safe since
Haskell is pure. -
let - 'getCounters' is the only server-facing command in our STM API. The
only permitted operation is retrieving the current set of user
counters.
'getCounters' closes over the 'hMap' reference currently in scope so
that the server never needs to be aware about our internal
implementation. -
getCounters :: STM [Int]
getCounters = do
refs <- fmap H.elems (readTVar hMap)
mapM readTVar refs
- 'init' is the only client-facing command in our STM API. It
initializes the client's entry in the hash map and returns two
commands: the first command is what the client calls to 'increment'
their counter and the second command is what the client calls to log
off and delete
'delete' command.
Notice that those two returned commands each close over the client's
unique STM reference so the client never needs to be aware of how
exactly 'init' is implemented under the hood. -
init :: STM (STM (), STM ())
init = do
n <- readTVar nRef
writeTVar nRef $! n + 1
ref <- newTVar 0
modifyTVar' hMap (H.insert n ref)
let incrementRef :: STM ()
incrementRef = do
mRef <- fmap (H.lookup n) (readTVar hMap)
forM_ mRef $ \ref -> modifyTVar' ref (+ 1)
deleteRef :: STM ()
deleteRef = modifyTVar' hMap (H.delete n)
return (incrementRef, deleteRef)
- Now for the actual program logic. Everything past this point only uses
the approved STM API (i.e. 'getCounters' and 'init'). If I wanted I
could factor the above approved STM API into a separate module to enforce
the encapsulation boundary, but I am lazy. -
- Fork a thread which polls the current state of the counters and displays
it to the console. There is a way to implement this without polling but
this gets the job done for now.
Most of what it is doing is just some simple tricks to reuse the same
console line instead of outputting a stream of lines. Otherwise it
would be just:
forkIO $ forever $ do
ns <- atomically getCounters
print ns
-
forkIO $ (`evalStateT` 0) $ forever $ do
del <- get
lift $ do
putStr (replicate del '\b')
putStr (replicate del ' ' )
putStr (replicate del '\b')
ns <- lift $ atomically getCounters
let str = show ns
lift $ putStr str
put $! length str
lift $ threadDelay 10000
- Fork a thread for each incoming connection, which listens to the client's
commands and translates them into 'STM' actions -
serve HostAny "8080" $ \(socket, _) -> do
(increment, delete) <- atomically init
- Right now, just do the dumb thing and convert all keypresses into
increment commands, with the exception of the 'q' key, which will
quit -
let handler :: (Proxy p) => () -> Consumer p Char IO ()
handler () = runIdentityP loop
where
loop = do
c <- request ()
unless (c == 'q') $ do
lift $ atomically increment
loop
- This uses my 'pipes' library. It basically is a high-level way to
say:
* Read binary packets from the socket no bigger than 4096 bytes
* Get the first character from each packet and discard the rest
* Handle the character using the above 'handler' function -
runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler
- The above pipeline finishes either when the socket closes or
'handler' stops looping because it received a 'q'. Either case means
that the client is done so we log them out using 'delete'. -
atomically delete
接下来是客户端,它只是打开一个连接并将所有按键作为单个数据包转发:
import Control.Monad
import Control.Proxy
import Control.Proxy.Safe
import Control.Proxy.TCP.Safe
import Data.ByteString.Char8 (pack)
import System.IO
main = do
hSetBuffering stdin NoBuffering
hSetEcho stdin False
- Again, this uses my 'pipes' library. It basically says:
* Read characters from the console using 'commands'
* Pack them into a binary format
* send them to a server running at 127.0.0.1:8080
This finishes looping when the user types a 'q' or the connection is
closed for whatever reason.
-
runSafeIO $ runProxy $ runEitherK $
try . commands
>-> mapD (\c -> pack [c])
>-> connectWriteD Nothing "127.0.0.1" "8080"
commands :: (Proxy p) => () -> Producer p Char IO ()
commands () = runIdentityP loop
where
loop = do
c <- lift getChar
respond c
unless (c == 'q') loop
非常简单:commands
生成 Char
s 的流,然后将其转换为 ByteString
s,然后作为数据包发送到服务器。
如果您运行服务器和几个客户端并让它们分别键入几个键,您的服务器显示将输出一个列表,显示每个客户端键入了多少键:
[1,6,4]
...如果某些客户端断开连接,它们将从列表中删除:
[1,4]
请注意,这些示例中的 pipes
组件将在即将发布的 pipes-4.0.0
版本中大大简化,但当前的 pipes
生态系统仍可按原样完成工作。
【讨论】:
很棒的解决方案,我一定会考虑的 ;) 只是为了我的理解:STM被认为是纯的吗?我想这不是因为它完全是关于不使用锁定机制的可变性,对吧? @Kr0e 对。将 STM 视为可组合的、线程安全的可变内存引用。 当我第一次读到 Haskell 的时候,每个人都说可变的东西真的很邪恶,应该避免,只有在没有办法的情况下才使用它。作为一个初学者,我对这个问题仍然很困惑,因为 STM 显然是 haskell 的优势之一,但它违反了核心概念,不是吗?或者,如果 monad 明确跟踪可变性,它是否“可以”?我真的在寻找何时使用可变性功能的某种指南。 @Kr0e 如果您不使用显式线程(仅par
和类似的纯结构),则无需处理可变性和状态。但是,一旦您开始使用并发,即显式与线程一起工作,并且您希望它们以某种方式进行通信,就必须有一些可变状态(即值随时间变化),以某种方式在线程之间共享和可见。因此,如果您需要使用多个 cpu 内核,您有两种选择:保持纯粹并仅使用并行性,或者使用并发性并在可变世界中处理线程间通信。【参考方案2】:
首先,我绝对推荐使用您自己的特定数据类型来表示命令。使用(String, Maybe (Chan [a]), Maybe a)
时,有缺陷的客户端可以通过发送未知命令或发送("add", Nothing, Nothing)
等方式使您的actor 崩溃。我建议类似
data Command a = Add a | Remove a | Get (Chan [a])
然后您可以在storage
中以保存方式对命令进行模式匹配。
演员有他们的优点,但我也觉得他们有一些缺点。例如,从演员那里获得答案需要向其发送命令,然后等待回复。并且客户端不能完全确定它是否得到了回复并且回复将是某种特定类型 - 你不能说我只想要这个特定命令的这种类型的答案(以及其中的多少)。
因此,作为示例,我将给出一个简单的 STM 解决方案。最好使用哈希表或(平衡树)集合,但由于Handle
既没有实现Ord
也没有实现Hashable
,我们不能使用这些数据结构,所以我将继续使用列表。
module ThreadSet (
TSet, add, remove, get
) where
import Control.Monad
import Control.Monad.STM
import Control.Concurrent.STM.TVar
import Data.List (delete)
newtype TSet a = TSet (TVar [a])
add :: (Eq a) => a -> TSet a -> STM ()
add x (TSet v) = readTVar v >>= writeTVar v . (x :)
remove :: (Eq a) => a -> TSet a -> STM ()
remove x (TSet v) = readTVar v >>= writeTVar v . delete x
get :: (Eq a) => TSet a -> STM [a]
get (TSet v) = readTVar v
此模块实现了一组基于STM
的任意元素。您可以拥有多个这样的集合,并在单个STM
事务中一起使用它们,该事务一次成功或失败。例如
-- | Ensures that there is exactly one element `x` in the set.
add1 :: (Eq a) => a -> TSet a -> STM ()
add1 x v = remove x v >> add x v
这对于演员来说会很困难,你必须将它添加为演员的另一个命令,你不能将它组合成现有的动作并且仍然具有原子性。
更新:有一个有趣的article 解释了为什么 Clojure 设计者选择不使用演员。例如,使用 Actor 时,即使您对可变结构进行了多次读取而只有很少的写入,它们都会被序列化,这会极大地影响性能。
【讨论】:
好吧,序列化/反序列化确实要花很多钱,这是真的。 CloudHaskell 确实具有相同的“序列化开销”,他们称之为功能。但是最近他们添加了一个不安全的发送函数,它可以在没有 ser./deser 的情况下传递消息。这要快一个数量级。从理论上讲,消息传递应该像简单的函数调用一样便宜,以使演员模式成为真正的替代方案,当然,情况并非如此,但在 Erlang 中却是。我认为 STM 是一个非常棒的功能,也许同时使用这两种技术是可行的方法,因为与 actor-pattern 相比,STM 确实是低级的。以上是关于Haskell - 基于 Actor 的可变性的主要内容,如果未能解决你的问题,请参考以下文章