如何使 genserver 以频率值运行 Elixir

Posted

技术标签:

【中文标题】如何使 genserver 以频率值运行 Elixir【英文标题】:how to make a genserver to run with a frequency value Elixir 【发布时间】:2021-03-16 18:52:48 【问题描述】:

我见过很多 GenServer 实现,我正在尝试创建一个具有此类规范的实现,但我不确定它的 GenServer 用例。

我有这样的状态

%url: "abc.com/jpeg", name: "Camera1", id: :camera_one, frequency: 10

我有这样 100 个状态,具有不同的值,我的用例包含 5 个步骤。

    将每个状态作为 Gen? 开始。 向该 URL 发送 HTTP 请求。 获取结果。 发送另一个 HTTP 请求,数据来自第一个请求。 使进程进入睡眠状态。如果频率为 10,则持续 10 秒,依此类推,10 秒后将再次从 1 步开始。

现在,当我启动 100 个这样的 worker 时,将会有 100 * 2 个频繁的 HTTP 请求。我不确定我会使用 GenServer 还是 GenStage 或 Flow 甚至是 Broadway?

我还担心 HTTP 请求不会崩溃,例如一个有状态的工作人员会发送一个请求,如果频率是在第一个请求返回之前 1 秒,另一个请求会被发送, GenServer 有足够的能力处理这些情况吗?我认为这叫做背压?

我一直在询问和研究这个用例这么久,我的用例也被引导到 RabbitMQ。

任何指导都会很有帮助,或者任何最小的例子都会非常感激。

? GenServer/ GenStage / GenStateMachine

【问题讨论】:

那肯定不是Broadway 也不是Flow。我在这里也没有看到GenStage 的任何应用程序。不久前,我遇到了类似的任务,我已经发布了 Tarearbol 库,它正是这样做的(除了其他事情之外),一个实现了一种行为,而后面的 DynamicSupervisor 完成了其余的工作。 是否可以启动一个gen server,然后让它在做请求的时候绕一圈移动,保存它,然后再次发送它,然后让自己进入睡眠状态,过一会儿再启动。如果一个请求没有成功,也可以处理其他请求。 @JunaidFarooq 可以做类似的事情,如this article 中所述。 【参考方案1】:

您的问题归结为减少给定时间的并发网络请求数。

一个简单的方法是拥有一个跟踪传出请求计数的 GenServer。然后,对于每个客户(在您的情况下最多 200 个),它可以检查是否有打开的请求,然后采取相应的行动。下面是服务器的样子:

defmodule Throttler do
  use GenServer

  #server
  @impl true
  def init(max_concurrent: max_concurrent) do
    :ok, %count: 0, max_concurrent: max_concurrent
  end

  @impl true
  def handle_call(:run, _from, %count: count, max_concurrent: max_concurrent = state) when count < max_concurrent, do: :reply, :ok, %state | count: count + 1
  @impl true
  def handle_call(:run, _from, %count: count, max_concurrent: max_concurrent = state) when count >= max_concurrent, do: :reply, :error, :too_many_requests, state


  @impl true
  def handle_call(:finished, _from, %count: count = state) when count > 0, do: :reply, :ok, %state | count: count - 1
end

好的,现在我们有一个服务器,我们可以在其中调用handle_call(pid, :run),它会告诉我们是否超出了计数。一旦任务(获取 URL)完成,我们需要调用 handle_call(pid, :finished) 让服务器知道我们已经完成了任务。

在客户端,我们可以将其包装在一个方便的辅助函数中。 (请注意,这仍在 Throttler 模块中,因此 __MODULE__ 有效)

defmodule Throttler do
  #client
  def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
    GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
  end
  
  def execute_async(pid, func) do
    GenServer.call(pid, :run)
    |> case do
      :ok ->
        task = Task.async(fn -> 
          try do
            func.()
          after
            GenServer.call(pid, :finished)
          end
        end)
        :ok, task
      :error, reason -> :error, reason, func
    end
  end
end

这里我们传入一个我们希望在客户端异步执行的函数,并在执行之前在服务器端完成调用 :run 和 :finished 的工作。如果成功,我们将返回一个任务,否则我们将失败。

把它们放在一起,你会得到如下所示的代码:

:ok, pid = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num -> 
  Throttler.execute(pid, fn ->
    IO.puts("Running command #num")
    :timer.sleep(:5000)
    IO.puts("Sleep complete for #num")
    num * 10
  end)
end)
valid_tasks = Enum.filter(results, &(match?(:ok, _func, &1))) |> Enum.map(&elem(&1, 1))

现在你有一堆任务要么成功,要么失败,你可以采取适当的行动。

失败后你会怎么做?这是背压的有趣部分 :) 最简单的事情是超时并重试,假设您最终会清除下游的压力。否则,您可能会完全失败请求并继续将问题推送到上游。

【讨论】:

以上是关于如何使 genserver 以频率值运行 Elixir的主要内容,如果未能解决你的问题,请参考以下文章

在 Elixir ExUnit 中,我如何保证 Supervisor 将创建一个新的 GeNserver?

使用Elix中的Acr将文件上载到本地

我可以从远程节点调用 GenServer 客户端函数吗?

Elixir:通过某些操作重新启动 GenServer

无法在 GenServer 中重现“启动/循环”行为

每次测试后停止 GenServer