开始 fetch,在 fetch 期间对中间请求进行排队,然后为所有人提供数据
Posted
技术标签:
【中文标题】开始 fetch,在 fetch 期间对中间请求进行排队,然后为所有人提供数据【英文标题】:Start fetch, queue intermediate requests during fetch, then serve data for all 【发布时间】:2017-05-21 00:03:10 【问题描述】:我在使用 Elixir 和 Phoenix 实现以下流程时遇到问题:
-
来自用户 A 的请求,第 3 方 API 缓存为空
启动第 3 方通过 HTTP 获取 API
-
获取尚未完成,来自用户 B 的请求进来
用户 B 等待完成提取
不同的路由或路由参数应该使用不同的队列。在第 3 方 API 数据仍在获取时传入的请求在任何情况下都不应触发具有相同参数的额外获取。等待部分(2.2.)对我来说至关重要。
从我目前阅读的内容来看,这个问题似乎可以使用标准 Elixir / Erlang / OTP 功能解决。
【问题讨论】:
【参考方案1】:是的,与大多数其他语言相比,这在 Elixir/Erlang 中可以很容易地完成。这是使用内存缓存执行此操作的一种方法。如果您以前使用过 GenServer 而不是 GenServer.reply/2
,这里需要注意的是,我们存储传入的 handle_call
请求的 from
参数,当请求完成时,我们会响应每个请求。我没有在这个 POC 代码中很好地处理错误,但它正确处理了最有趣的部分,即 2.2:
defmodule CachedParallelHTTP do
def start_link do
GenServer.start_link(__MODULE__, :ok)
end
def init(_) do
:ok, %
end
def handle_call(:fetch, arg, from, state) do
case state[arg] do
%status: :fetched, response: response ->
# We've already made this request; just return the cached response.
:reply, response, state
%status: :fetching ->
# We're currently running this request. Store the `from` and reply to the caller later.
state = update_in(state, [arg, :froms], fn froms -> [from | froms] end)
:noreply, state
nil ->
# This is a brand new request. Let's create the new state and start the request.
pid = self()
state = Map.put(state, arg, %status: :fetching, froms: [from])
Task.start(fn ->
IO.inspect :making_request, arg
# Simulate a long synchronous piece of code. The actual HTTP call should be made here.
Process.sleep(2000)
# dummy response
response = arg <> arg <> arg
# Let the server know that this request is done so it can reply to all the `froms`,
# including the ones that were added while this request was being executed.
GenServer.call(pid, :fetched, arg, response)
end)
:noreply, state
end
end
def handle_call(:fetched, arg, response, _from, state) do
# A request was completed.
case state[arg] do
%status: :fetching, froms: froms ->
IO.inspect "notifying #length(froms) clients waiting for #arg"
# Reply to all the callers who've been waiting for this request.
for from <- froms do
GenServer.reply(from, response)
end
# Cache the response in the state, for future callers.
state = Map.put(state, arg, %status: :fetched, response: response)
:reply, :ok, state
end
end
end
这里有一小段代码来测试这个:
now = fn -> DateTime.utc_now |> DateTime.to_iso8601 end
:ok, s = CachedParallelHTTP.start_link
IO.inspect :before_request, now.()
for i <- 1..3 do
Task.start(fn ->
response = GenServer.call(s, :fetch, "123")
IO.inspect :response, "123", i, now.(), response
end)
end
:timer.sleep(1000)
for i <- 1..5 do
Task.start(fn ->
response = GenServer.call(s, :fetch, "456")
IO.inspect :response, "456", i, now.(), response
end)
end
IO.inspect :after_request, now.()
:timer.sleep(10000)
输出:
:before_request, "2017-01-06T10:30:07.852986Z"
:making_request, "123"
:after_request, "2017-01-06T10:30:08.862425Z"
:making_request, "456"
"notifying 3 clients waiting for 123"
:response, "123", 3, "2017-01-06T10:30:07.860758Z", "123123123"
:response, "123", 2, "2017-01-06T10:30:07.860747Z", "123123123"
:response, "123", 1, "2017-01-06T10:30:07.860721Z", "123123123"
"notifying 5 clients waiting for 456"
:response, "456", 5, "2017-01-06T10:30:08.862556Z", "456456456"
:response, "456", 4, "2017-01-06T10:30:08.862540Z", "456456456"
:response, "456", 3, "2017-01-06T10:30:08.862524Z", "456456456"
:response, "456", 2, "2017-01-06T10:30:08.862504Z", "456456456"
:response, "456", 1, "2017-01-06T10:30:08.862472Z", "456456456"
请注意,使用 GenServer.reply
和 Task.start
,单个 GenServer 能够处理多个并行请求,同时保持面向用户的 API 完全同步。根据您要处理的负载量,您可能需要考虑使用 GenServer 池。
【讨论】:
感谢您的详细回复!这肯定需要处理很多(注意:我刚刚开始使用 Elixir)。将CachedParallelHTTP
作为主管工作人员是否有意义?
啊,我建议先阅读官方的“混合和 OTP”指南,然后再尝试理解我在这种情况下的答案:elixir-lang.org/getting-started/mix-otp/…。
“作为主管工作人员启动 CachedParallelHTTP 有意义吗?”是的,我肯定会这样做,以便在崩溃时重新启动。
这个答案看起来像是要走的路。您还可以将问题分解为 2 个单独的 GenServer
s - 一个用于处理缓存,与此处的 CachedParallelHTTP
几乎相同,另一个用于获取 API。您将失去使用 Task
s 获得的固有并行性,但您可以轻松实现节流。
既然GenServer.call(pid, :fetched, arg, response)
在一个Task里面,它不会回复同一个进程而不是原来的调用者吗?以上是关于开始 fetch,在 fetch 期间对中间请求进行排队,然后为所有人提供数据的主要内容,如果未能解决你的问题,请参考以下文章
Django 使用 Fetch 对 POST 请求返回 403 错误
在 git svn clone/fetch 期间避免“警告:有太多无法访问的松散对象”