在clojure原子交换中执行副作用的正确方法是什么
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在clojure原子交换中执行副作用的正确方法是什么相关的知识,希望对你有一定的参考价值。
我在原子中保留了一个进程注册表。
我想每个id
启动一个且只有一个进程(特别是core.async go-loop)。
但是,你不应该在swap!
中执行副作用,所以这段代码不好:
(swap! processes-atom
(fn [processes]
(if (get processes id)
processes ;; already exists, do nothing
(assoc processes id (create-process! id)))))
我该如何正确地做到这一点?
我看过locking
,它将一个物体作为锁的监视器。我希望每个id
- 都是动态的 - 都有自己的锁。
正如OlegTheCat所示,您可以手工使用locking
,这通常是一种很好的方法。但是,在评论中你注意到,只要产生一个进程就可以避免整个原子被锁定,这也是很好的,这也可以用一种非常简单的方式来实现:而不是从pid到map的映射,有一个从pid到延迟过程的地图。这样,您可以非常便宜地添加新延迟,并且实际上只通过在swap!
调用之外取消引用延迟来创建进程。取消引用延迟将阻止等待特定延迟,因此需要相同进程的多个线程不会踩到彼此的脚趾,但原子本身将被解锁,允许想要不同进程的线程获取它。
下面是该方法的示例实现,以及您的问题所暗示的其他变量的示例定义,以使代码按原样运行:
(def process-results (atom []))
(defn create-process! [id]
;; pretend creating the process takes a long time
(Thread/sleep (* 1000 (rand-int 3)))
(future
;; running it takes longer, but happens on a new thread
(Thread/sleep (* 1000 (rand-int 10)))
(swap! process-results conj id)))
(def processes-atom (atom {}))
(defn cached-process [id]
(-> processes-atom
(swap! (fn [processes]
(update processes id #(or % (delay (create-process! id))))))
(get id)
(deref)))
当然,如果您已经定义了其他内容,则只需要cached-process
。并运行一个示例,以显示进程已成功重用:
(defn stress-test [num-processes]
(reset! process-results [])
(reset! processes-atom {})
(let [running-processes (doall (for [i (range num-processes)]
(cached-process (rand-int 10))))]
(run! deref running-processes)
(deref process-results)))
user> (time (stress-test 40))
"Elapsed time: 18004.617869 msecs"
[1 5 2 0 9 7 8 4 3 6]
您似乎需要保护processes-atom
免于并发修改,以便只有单个线程可以访问它。 locking
将在这种情况下工作。因为,通过使用locking
,我们将自己管理线程安全,我们可以使用volatile
而不是atom
(volatile
更快,但不提供任何线程安全和原子性guaranees)。
综上所述,下面的内容应该可以正常工作:
(def processes-volatile (volatile! {}))
(defn create-and-save-process! [id]
(locking processes-volatile
(vswap! processes-volatile
(fn [processes]
(if (get processes id)
processes
(assoc processes id (create-process! id)))))))
另一个答案是使用agent
开始每个过程。这将每个进程彼此分离,并避免可能多次调用“create-process”函数的问题:
(defn start-proc-agent
[state]
(let [delay (int (* 2000 (rand)))]
(println (format "starting %d" (:id state)))
(Thread/sleep delay)
(println (format "finished %d" (:id state)))
(merge state {:delay delay :state :running} )))
(def procs-agent (atom {}))
(dotimes [i 3]
(let [curr-agent (agent {:id i :state :unstarted})]
(swap! procs-agent assoc i curr-agent)
(send curr-agent start-proc-agent )))
(println "all dispatched...")
(pprint @procs-agent)
(Thread/sleep 3000)
(pprint @procs-agent)
运行时,我们看到:
starting 2
starting 1
starting 0
all dispatched...
{0 #<Agent@39d8240b: {:id 0, :state :unstarted}>,
1 #<Agent@3a6732bc: {:id 1, :state :unstarted}>,
2 #<Agent@7414167a: {:id 2, :state :unstarted}>}
finished 0
finished 1
finished 2
{0 #<Agent@39d8240b: {:id 0, :state :running, :delay 317}>,
1 #<Agent@3a6732bc: {:id 1, :state :running, :delay 1635}>,
2 #<Agent@7414167a: {:id 2, :state :running, :delay 1687}>}
因此,全局映射procs-agent将每个进程ID与该进程的代理相关联。这种方法的另一个好处是,您可以将后续命令(以函数的形式)发送给进程的代理,并确保它们与其他所有代理独立(并行和异步)。
替代解决方案
与原始问题类似,我们可以使用单个代理(而不是每个进程的代理)来简单地序列化每个进程的创建。由于代理是异步的,因此它们无法像swap!
那样重新尝试输入函数。因此,副作用功能不是问题。你可以像这样写:
(defn start-proc-once-only
[state i]
(let [curr-proc (get state i) ]
(if (= :running (:state curr-proc))
(do
(println "skipping restart of" i)
state)
(let [delay (int (* 2000 (rand)))]
(println (format "starting %d" i))
(Thread/sleep delay)
(println (format "finished %d" i))
(assoc state i {:delay delay :state :running})))))
(def procs (agent {}))
(dotimes [i 3]
(println :starting i)
(send procs start-proc-once-only i))
(dotimes [i 3]
(println :starting i)
(send procs start-proc-once-only i))
(println "all dispatched...")
(println :procs) (pprint @procs)
(Thread/sleep 5000)
(println :procs) (pprint @procs)
结果
:starting 0
:starting 1
:starting 2
starting 0
:starting 0
:starting 1
:starting 2
all dispatched...
:procs
{}
finished 0
starting 1
finished 1
starting 2
finished 2
skipping restart of 0
skipping restart of 1
skipping restart of 2
:procs
{0 {:delay 1970, :state :running},
1 {:delay 189, :state :running},
2 {:delay 1337, :state :running}}
我认为你应该使用add-watch。每次更改原子时都会调用一次。在watch-fn中检查是否已将新id添加到atom中,如果是,则创建该进程并将其添加到atom。这将触发对watch-fn的另一次调用,但第二次调用将不会识别需要进程的任何新id。
以上是关于在clojure原子交换中执行副作用的正确方法是什么的主要内容,如果未能解决你的问题,请参考以下文章
Scheme和Clojure没有原子类型谓词 - 这是设计的吗?
如何正确读取 Clojure 中的 RandomAccessFile?
在这个简单的Clojure Ring应用程序中获取默认index.html文件以具有正确的Content-Type的正确方法是什么?