markdown core.async等等!选取最快的服务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了markdown core.async等等!选取最快的服务相关的知识,希望对你有一定的参考价值。
(ns core-async-alts-fastest.5-alts-timeout-put-operation
(:require [clojure.core.async :as async :refer [<!! >!! <! >! go onto-chan close! timeout chan alts! alts!! alt!!]]
))
;;
;; from braveclojure: https://www.braveclojure.com/core-async/
;;
;; You can also use alts!! to specify put operations.
;; To do that, place a vector inside the vector you pass to alts!!,
;; like at ➊ in this example:
;; 在 alts!! 后面也可以加put操作, 当然顺序是
(comment
(let [c1 (chan)
c2 (chan)]
(go (<! c2))
(let [[value channel] (alts!! [c1 [c2 "put!"]])]
(println value)
(= channel c2)))
)
; => true
; => true
;;
;; timeout
;; 设置自动超时时间, 放在alts! 里和其他channel一起
;;
(defn upload
[headshot c]
(go (Thread/sleep (rand 100))
(>! c headshot)))
(comment
(let [c1 (chan)]
(upload "serious.jpg" c1)
(let [[headshot channel] (alts!! [c1 (timeout 20)])]
(if headshot
(println "Sending headshot notification for" headshot)
(println "Timed out!"))))
)
; => Timed out!
(ns core-async-alts-fastest.4-alts-debounce)
;; debounce
;; from here: https://zhuanlan.zhihu.com/p/23497216
;;
;; 其中 in 和 out 是两个 Channel, 代码实现的功能是从 in 读取的数据,
;; 按照事件做 debounce 丢弃掉一部分数据, 保证过于频繁发送的数据只会去最后一个
(defn debounce [in ms]
(let [out (chan)]
(go-loop [last-val nil]
(let [val (if (nil? last-val) (<! in) last-val)
timer (timeout ms)
[new-val ch] (alts! [in timer])]
(condp = ch
timer (do (>! out val) (recur nil))
in (recur new-val))))
out))
(ns core-async-alts-fastest.3-alts-walkthrough)
(require '[clojure.core.async :as async :refer :all])
;;;; ALTS
;; One killer feature for channels over queues is the ability to wait
;; on many channels at the same time (like a socket select). This is
;; done with `alts!!` (ordinary threads) or `alts!` in go blocks.
;; We can create a background thread with alts that combines inputs on
;; either of two channels. `alts!!` takes either a set of operations
;; to perform - either a channel to take from a [channel value] to put
;; and returns the value (nil for put) and channel that succeeded:
(let [c1 (chan)
c2 (chan)]
(thread (while true
(let [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch))))
;; @problems
;; alts! 应该是返回c1 c2中的一个, 怎么两个都返回了?
(>!! c1 "hi")
(>!! c2 "there"))
;; Prints (on stdout, possibly not visible at your repl):
;; Read hi from #<ManyToManyChannel ...>
;; Read there from #<ManyToManyChannel ...>
;; We can use alts! to do the same thing with go blocks:
(let [c1 (chan)
c2 (chan)]
(go (while true
(let [[v ch] (alts! [c1 c2])]
(println "Read" v "from" ch))))
(go (>! c1 "hi"))
(go (>! c2 "there")))
;; Since go blocks are lightweight processes not bound to threads, we
;; can have LOTS of them! Here we create 1000 go blocks that say hi on
;; 1000 channels. We use alts!! to read them as they're ready.
(let [n 1000
cs (repeatedly n chan)
begin (System/currentTimeMillis)]
(doseq [c cs] (go (>! c "hi")))
(dotimes [i n]
(let [[v c] (alts!! cs)]
(assert (= "hi" v))))
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
;; We can combine timeout with `alts!` to do timed channel waits.
;; Here we wait for 100 ms for a value to arrive on the channel, then
;; give up:
(let [c (chan)
begin (System/currentTimeMillis)]
(alts!! [c (timeout 100)])
(println "Gave up after" (- (System/currentTimeMillis) begin)))
(ns core-async-alts-fastest.2-alts-google-fastest
(:require [clojure.core.async :as async :refer [<!! >!! <! >! go onto-chan close! timeout chan alt! alts! alt!!]]
))
(defn fake-search [kind]
;; 这里的两个参数外部何时调用?
(fn [c query]
(go
(<! (timeout (rand-int 100)))
(>! c [kind query]))))
;; 三个服务都有双备份,谁快用谁
(def web1 (fake-search :web1))
(def web2 (fake-search :web2))
(def image1 (fake-search :image1))
(def image2 (fake-search :image2))
(def video1 (fake-search :video1))
(def video2 (fake-search :video2))
;; 两个服务中选择最快的那个
;; (fastest "clojure" web1 web2)
(defn fastest [query & replicas]
(let [c (chan)]
(doseq [replica replicas]
;; (fake-search :web1 c "clojure")
(replica c query))
;; 返回内容 [:web1 "clojure"] OR [:web2 "clojure"]
;; 获取c, 谁先往channel写入成功返回谁
c))
;; google search
;; 查询3个服务,每个服务都有2个备份
(defn google [query]
(let [c (chan)
t (timeout 80)]
;; 这里向c 写入3次不同的服务; 每个fastest只有一个channel, 两个中只有一个写成功
(go (>! c (<! (fastest query web1 web2))))
(go (>! c (<! (fastest query image1 image2))))
(go (>! c (<! (fastest query video1 video2))))
(go (loop [i 0 ret []]
(if (= i 3)
ret
;; 这里 v 是什么?
;; @problems
(recur (inc i) (conj ret
;; alt 需要确保后面的参数是偶数, 但是我不知道这里的v有什么用
;; 我觉得还是alts 容易理解
#_(alts! [c t])
;; @problems
;; 下面括号干啥?
(alt! [c t] ([v] v)) ;; what's the fuck of v?
)))))))
(comment
(<!! (google "clojure"))
)
;; from netty socket
;; 下面这个和google上面的有点类似, 即alt! 后面是偶数
;;
(defn- write-loop
"Returns a channel containing the result of a loop that sends
messages over the WebSocket when items are placed on the
write-channel.
If an error occurs or the remote endpoint is closed, the exception
or status code and reason are given as the result. When one of
these events occurs, it closes the write channel so that no new
messages may be attempted. Values already on the write channel that
have yet to be sent to the client will remain on the write channel
so that the user can decide what to do with them.
When anything would stop writing, the read-channel is also closed.
If the user closes the write-channel, the session will be closed
with it.
Sessions will be closed with status code 1000 (CLOSE_NORMAL)."
[^Session session read-channel write-channel result-channel]
(let [remote (.getRemote session)]
(go-loop []
;; here
(alt!
result-channel
;; 这边括号干啥?
;; @problems
([v]
;; The session is already closed, so just deal with the
;; read and write channels.
(close! write-channel)
(close! read-channel)
v)
write-channel
([message]
(if (nil? message)
(do (.close session) (close! read-channel))
(do
;; Errors inside of the sendString call are passed to
;; onWebSocketError so you don't need to capture them
;; here
(.sendString remote message)
(recur))))
:priority true))))
;;
;; upload multiple image to server
;; first come first served
;;
(ns core_async_alts_fastest.1_alts_upload_image
(:require [clojure.core.async :as async :refer [<!! >!! <! >! go onto-chan close! timeout chan alts! alts!! alt!!]]
))
(defn upload-image
[headshot c]
;; go 语句放在upload method里, 我习惯在外部自己写go调用方法
(go (Thread/sleep (rand 100))
;; 向channel 写入, 类似于deliver
(>! c headshot)))
(comment
(let [c1 (chan)
c2 (chan)
c3 (chan)]
;;
(upload-image "serious.jpg" c1)
(upload-image "fun.jpg" c2)
(upload-image "sassy.jpg" c3)
;; alts! 返回的的内容是一个vector,分别是channel的内容 和 channel本身
(let [[headshot channel] (alts!! [c1 c2 c3])]
(println "Sending headshot notification for" headshot))
)
)
;; go while 版本, 实际上alts!! 是可以取出全部的, 不一定是只返回一个.
(comment
(let [c1 (chan)
c2 (chan)
c3 (chan)]
;;
(upload-image "serious.jpg" c1)
(upload-image "fun.jpg" c2)
(upload-image "sassy.jpg" c3)
;; alts! 返回的的内容是一个vector,分别是channel的内容 和 channel本身
(go (while true (let [[headshot channel] (alts!! [c1 c2 c3])]
(println "Sending headshot notification for" headshot))))
)
)
以上是关于markdown core.async等等!选取最快的服务的主要内容,如果未能解决你的问题,请参考以下文章
text future用法(含有CountDownLatch)core.async等等!
不再使用 clojure core.async 通道是不是应该关闭?
是否应该耗尽 clojure.core.async 通道以释放停放的 put