在clojure原子交换中执行副作用的正确方法是什么

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在clojure原子交换中执行副作用的正确方法是什么相关的知识,希望对你有一定的参考价值。

我在原子中保留了一个进程注册表。

我想每个id启动一个且只有一个进程(特别是core.async go-loop)。

但是,你不应该在swap!中执行副作用,所以这段代码不好:

(swap! processes-atom
       (fn [processes]
         (if (get processes id)
           processes ;; already exists, do nothing
           (assoc processes id (create-process! id)))))

我该如何正确地做到这一点?

我看过locking,它将一个物体作为锁的监视器。我希望每个id - 都是动态的 - 都有自己的锁。

答案

正如OlegTheCat所示,您可以手工使用locking,这通常是一种很好的方法。但是,在评论中你注意到,只要产生一个进程就可以避免整个原子被锁定,这也是很好的,这也可以用一种非常简单的方式来实现:而不是从pid到map的映射,有一个从pid到延迟过程的地图。这样,您可以非常便宜地添加新延迟,并且实际上只通过在swap!调用之外取消引用延迟来创建进程。取消引用延迟将阻止等待特定延迟,因此需要相同进程的多个线程不会踩到彼此的脚趾,但原子本身将被解锁,允许想要不同进程的线程获取它。

下面是该方法的示例实现,以及您的问题所暗示的其他变量的示例定义,以使代码按原样运行:

(def process-results (atom []))
(defn create-process! [id]
  ;; pretend creating the process takes a long time
  (Thread/sleep (* 1000 (rand-int 3)))
  (future
    ;; running it takes longer, but happens on a new thread
    (Thread/sleep (* 1000 (rand-int 10)))
    (swap! process-results conj id)))

(def processes-atom (atom {}))
(defn cached-process [id]
  (-> processes-atom
      (swap! (fn [processes]
               (update processes id #(or % (delay (create-process! id))))))
      (get id)
      (deref)))

当然,如果您已经定义了其他内容,则只需要cached-process。并运行一个示例,以显示进程已成功重用:

(defn stress-test [num-processes]
  (reset! process-results [])
  (reset! processes-atom {})
  (let [running-processes (doall (for [i (range num-processes)]
                                   (cached-process (rand-int 10))))]
    (run! deref running-processes)
    (deref process-results)))

user> (time (stress-test 40))
"Elapsed time: 18004.617869 msecs"
[1 5 2 0 9 7 8 4 3 6]
另一答案

您似乎需要保护processes-atom免于并发修改,以便只有单个线程可以访问它。 locking将在这种情况下工作。因为,通过使用locking,我们将自己管理线程安全,我们可以使用volatile而不是atomvolatile更快,但不提供任何线程安全和原子性guaranees)。

综上所述,下面的内容应该可以正常工作:

(def processes-volatile (volatile! {}))

(defn create-and-save-process! [id]
  (locking processes-volatile
    (vswap! processes-volatile
            (fn [processes]
              (if (get processes id)
                processes
                (assoc processes id (create-process! id)))))))
另一答案

另一个答案是使用agent开始每个过程。这将每个进程彼此分离,并避免可能多次调用“create-process”函数的问题:

(defn start-proc-agent
  [state]
  (let [delay (int (* 2000 (rand)))]
    (println (format "starting %d" (:id state)))
    (Thread/sleep delay)
    (println (format "finished %d" (:id state)))
    (merge state {:delay delay :state :running} )))

(def procs-agent (atom {}))
(dotimes [i 3]
  (let [curr-agent (agent {:id i :state :unstarted})]
    (swap! procs-agent assoc i curr-agent)
    (send curr-agent start-proc-agent )))
(println "all dispatched...")
(pprint @procs-agent)

(Thread/sleep 3000)
(pprint @procs-agent)

运行时,我们看到:

starting 2
starting 1
starting 0
all dispatched...
{0 #<Agent@39d8240b: {:id 0, :state :unstarted}>,
 1 #<Agent@3a6732bc: {:id 1, :state :unstarted}>,
 2 #<Agent@7414167a: {:id 2, :state :unstarted}>}
finished 0
finished 1
finished 2
{0 #<Agent@39d8240b: {:id 0, :state :running, :delay 317}>,
 1 #<Agent@3a6732bc: {:id 1, :state :running, :delay 1635}>,
 2 #<Agent@7414167a: {:id 2, :state :running, :delay 1687}>}

因此,全局映射procs-agent将每个进程ID与该进程的代理相关联。这种方法的另一个好处是,您可以将后续命令(以函数的形式)发送给进程的代理,并确保它们与其他所有代理独立(并行和异步)。


替代解决方案

与原始问题类似,我们可以使用单个代理(而不是每个进程的代理)来简单地序列化每个进程的创建。由于代理是异步的,因此它们无法像swap!那样重新尝试输入函数。因此,副作用功能不是问题。你可以像这样写:

(defn start-proc-once-only
  [state i]
  (let [curr-proc (get state i) ]
    (if (= :running (:state curr-proc))
      (do
        (println "skipping restart of" i)
        state)
      (let [delay (int (* 2000 (rand)))]
        (println (format "starting %d" i))
        (Thread/sleep delay)
        (println (format "finished %d" i))
        (assoc state i {:delay delay :state :running})))))

(def procs (agent {}))
(dotimes [i 3]
  (println :starting i)
  (send procs start-proc-once-only i))
(dotimes [i 3]
  (println :starting i)
  (send procs start-proc-once-only i))

(println "all dispatched...")
(println :procs) (pprint @procs)
(Thread/sleep 5000)
(println :procs) (pprint @procs)

结果

:starting 0
:starting 1
:starting 2
starting 0
:starting 0
:starting 1
:starting 2
all dispatched...
:procs
{}
finished 0
starting 1
finished 1
starting 2
finished 2
skipping restart of 0
skipping restart of 1
skipping restart of 2
:procs
{0 {:delay 1970, :state :running},
 1 {:delay 189, :state :running},
 2 {:delay 1337, :state :running}}
另一答案

我认为你应该使用add-watch。每次更改原子时都会调用一次。在watch-fn中检查是否已将新id添加到atom中,如果是,则创建该进程并将其添加到atom。这将触发对watch-fn的另一次调用,但第二次调用将不会识别需要进程的任何新id。

以上是关于在clojure原子交换中执行副作用的正确方法是什么的主要内容,如果未能解决你的问题,请参考以下文章

如何Clojure.Spec引用类型(如原子)?

什么时候应该使用交换或重置

Scheme和Clojure没有原子类型谓词 - 这是设计的吗?

如何正确读取 Clojure 中的 RandomAccessFile?

在这个简单的Clojure Ring应用程序中获取默认index.html文件以具有正确的Content-Type的正确方法是什么?

电路中串连和并连电阻的作用是什麽??电阻越大会是怎麽样