如何使 genserver 以频率值运行 Elixir
Posted
技术标签:
【中文标题】如何使 genserver 以频率值运行 Elixir【英文标题】:how to make a genserver to run with a frequency value Elixir 【发布时间】:2021-03-16 18:52:48 【问题描述】:我见过很多 GenServer 实现,我正在尝试创建一个具有此类规范的实现,但我不确定它的 GenServer 用例。
我有这样的状态
%url: "abc.com/jpeg", name: "Camera1", id: :camera_one, frequency: 10
我有这样 100 个状态,具有不同的值,我的用例包含 5 个步骤。
-
将每个状态作为 Gen? 开始。
向该 URL 发送 HTTP 请求。
获取结果。
发送另一个 HTTP 请求,数据来自第一个请求。
使进程进入睡眠状态。如果频率为 10,则持续 10 秒,依此类推,10 秒后将再次从 1 步开始。
现在,当我启动 100 个这样的 worker 时,将会有 100 * 2 个频繁的 HTTP 请求。我不确定我会使用 GenServer 还是 GenStage 或 Flow 甚至是 Broadway?
我还担心 HTTP 请求不会崩溃,例如一个有状态的工作人员会发送一个请求,如果频率是在第一个请求返回之前 1 秒,另一个请求会被发送, GenServer 有足够的能力处理这些情况吗?我认为这叫做背压?
我一直在询问和研究这个用例这么久,我的用例也被引导到 RabbitMQ。
任何指导都会很有帮助,或者任何最小的例子都会非常感激。
? GenServer/ GenStage / GenStateMachine
【问题讨论】:
那肯定不是Broadway
也不是Flow
。我在这里也没有看到GenStage
的任何应用程序。不久前,我遇到了类似的任务,我已经发布了 Tarearbol
库,它正是这样做的(除了其他事情之外),一个实现了一种行为,而后面的 DynamicSupervisor
完成了其余的工作。
是否可以启动一个gen server,然后让它在做请求的时候绕一圈移动,保存它,然后再次发送它,然后让自己进入睡眠状态,过一会儿再启动。如果一个请求没有成功,也可以处理其他请求。
@JunaidFarooq 可以做类似的事情,如this article 中所述。
【参考方案1】:
您的问题归结为减少给定时间的并发网络请求数。
一个简单的方法是拥有一个跟踪传出请求计数的 GenServer。然后,对于每个客户(在您的情况下最多 200 个),它可以检查是否有打开的请求,然后采取相应的行动。下面是服务器的样子:
defmodule Throttler do
use GenServer
#server
@impl true
def init(max_concurrent: max_concurrent) do
:ok, %count: 0, max_concurrent: max_concurrent
end
@impl true
def handle_call(:run, _from, %count: count, max_concurrent: max_concurrent = state) when count < max_concurrent, do: :reply, :ok, %state | count: count + 1
@impl true
def handle_call(:run, _from, %count: count, max_concurrent: max_concurrent = state) when count >= max_concurrent, do: :reply, :error, :too_many_requests, state
@impl true
def handle_call(:finished, _from, %count: count = state) when count > 0, do: :reply, :ok, %state | count: count - 1
end
好的,现在我们有一个服务器,我们可以在其中调用handle_call(pid, :run)
,它会告诉我们是否超出了计数。一旦任务(获取 URL)完成,我们需要调用 handle_call(pid, :finished)
让服务器知道我们已经完成了任务。
在客户端,我们可以将其包装在一个方便的辅助函数中。 (请注意,这仍在 Throttler 模块中,因此 __MODULE__
有效)
defmodule Throttler do
#client
def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
end
def execute_async(pid, func) do
GenServer.call(pid, :run)
|> case do
:ok ->
task = Task.async(fn ->
try do
func.()
after
GenServer.call(pid, :finished)
end
end)
:ok, task
:error, reason -> :error, reason, func
end
end
end
这里我们传入一个我们希望在客户端异步执行的函数,并在执行之前在服务器端完成调用 :run 和 :finished 的工作。如果成功,我们将返回一个任务,否则我们将失败。
把它们放在一起,你会得到如下所示的代码:
:ok, pid = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num ->
Throttler.execute(pid, fn ->
IO.puts("Running command #num")
:timer.sleep(:5000)
IO.puts("Sleep complete for #num")
num * 10
end)
end)
valid_tasks = Enum.filter(results, &(match?(:ok, _func, &1))) |> Enum.map(&elem(&1, 1))
现在你有一堆任务要么成功,要么失败,你可以采取适当的行动。
失败后你会怎么做?这是背压的有趣部分 :) 最简单的事情是超时并重试,假设您最终会清除下游的压力。否则,您可能会完全失败请求并继续将问题推送到上游。
【讨论】:
以上是关于如何使 genserver 以频率值运行 Elixir的主要内容,如果未能解决你的问题,请参考以下文章