红宝石块内的赛璐珞异步不起作用

Posted

技术标签:

【中文标题】红宝石块内的赛璐珞异步不起作用【英文标题】:Celluloid async inside ruby blocks does not work 【发布时间】:2015-12-06 04:34:24 【问题描述】:

尝试在我的工作示例中实现赛璐珞 async 似乎表现出奇怪的行为。

这里是我的代码

 class Indefinite
    include Celluloid

      def run!
         loop do 
           [1].each do |i|
             async.on_background
           end
         end
      end 


       def on_background
         puts "Running in background" 
       end
   end

   Indefinite.new.run!

但是当我运行上面的代码时,我从来没有看到 puts "Running in Background"

但是,如果我睡眠,代码似乎可以工作。

class Indefinite
   include Celluloid

    def run! 
      loop do 
        [1].each do |i|
          async.on_background
        end
        sleep 0.5
      end 
    end


   def on_background
     puts "Running in background" 
   end
 end

 Indefinite.new.run!

有什么想法吗?为什么上述两种情况会有这样的差异。

谢谢。

【问题讨论】:

你是如何实例化和调用类/方法的?如果它是不确定的,为什么不在应用程序本身的末尾sleep 为什么是loop[1].each 它和我的朋友正在尝试的例子(这个)[***.com/questions/32479871/… 起初我们认为这肯定与ZeroMq 有关,但后来我用上面的虚拟代码测试了这些东西,没有任何 ZeroMq 东西在其中,因此发现了celluloid 的问题 您的示例和您朋友的示例都缺少关键部分。你能把整个应用程序发布在一个要点中吗? @digitalextremist 这就是我所有的应用程序。如果你愿意,你可以自己试试。 【参考方案1】:

您的主循环正在控制参与者/应用程序的线程。

您的程序所做的只是生成后台进程,但从不运行它们。您需要在循环中使用 sleep 纯粹是为了让后台线程引起注意。

像这里那样让无条件循环产生无限的后台进程通常不是一个好主意。应该有一个延迟,或者在那里放置一个条件语句......否则你只会有一个无限循环产生永远不会被调用的东西。

这样想:如果您将puts "looping" 放在循环中,而您看不到Running in the background ...您将一遍又一遍地看到looping


方法 #1:使用 everyafter 块。

解决此问题的最佳方法不是在loop 中使用sleep,而是使用afterevery 块,如下所示:

every(0.1) 
    on_background

或者最重要的是,如果您想确保进程在再次运行之前完全运行,请改用after

def run_method
    @running ||= false
    unless @running
        @running = true
        on_background
        @running = false
    end
    after(0.1)  run_method 
 end

async 中使用loop 不是一个好主意,除非完成了某种流控制,或者像@server.accept 这样的阻塞进程...否则它只会占用100% 的CPU没有正当理由的核心。

顺便说一句,您也可以使用now_and_everynow_and_after...这将立即运行该块,然后在您想要的时间后再次运行它。

使用every 显示在此要点中:

https://gist.github.com/digitalextremist/686f42e58a58b743142b

我认为理想的情况:

这是一个粗略但立即可用的示例:

https://gist.github.com/digitalextremist/12fc824c6a4dbd94a9df
require 'celluloid/current'

class Indefinite
  include Celluloid

  INTERVAL = 0.5
  ONE_AT_A_TIME = true

  def self.run!
    puts "000a Instantiating."
    indefinite = new
    indefinite.run
    puts "000b Running forever:"
    sleep
  end

  def initialize
    puts "001a Initializing."
    @mutex = Mutex.new if ONE_AT_A_TIME
    @running = false
    puts "001b Interval: #INTERVAL"
  end

  def run
    puts "002a Running."
    unless ONE_AT_A_TIME && @running
      if ONE_AT_A_TIME
        @mutex.synchronize 
          puts "002b Inside lock."
          @running = true
          on_background
          @running = false
        
      else
        puts "002b Without lock."
        on_background
      end
    end
    puts "002c Setting new timer."
    after(INTERVAL)  run 
  end


  def on_background
    if ONE_AT_A_TIME
      puts "003 Running background processor in foreground."
    else
      puts "003 Running in background"
    end
  end
end

Indefinite.run!
puts "004 End of application."

如果ONE_AT_A_TIMEtrue,这将是它的输出:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.

如果ONE_AT_A_TIMEfalse,这将是它的输出:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.

您需要更多的“事件”而不是“线程”才能正确发出任务并保留范围和状态,而不是在线程/参与者之间发出命令...这是 everyafter 块提供的。除此之外,无论哪种方式都是很好的做法,即使您没有要处理的Global Interpreter Lock,因为在您的示例中,您似乎没有处理阻塞进程。如果你有一个阻塞进程,那么一定会有一个无限循环。但是,由于您最终会在处理一个任务之前生成无限数量的后台任务,因此您需要像您的问题一样使用sleep,或者完全使用不同的策略,并使用everyafter 这就是Celluloid 本身在处理任何类型的套接字上的数据时鼓励您操作的方式。


方法 #2:使用递归方法调用。

这刚刚出现在 Google 网上论坛中。下面的示例代码实际上将允许执行其他任务,即使它是一个无限循环。

https://groups.google.com/forum/#!topic/celluloid-ruby/xmkdrMQBGbY

这种方法不太理想,因为它可能会产生更多开销,从而产生一系列纤维。

def work
    # ...
    async.work
end

问题 #2:ThreadFiber 行为。

第二个问题是为什么以下方法会起作用:loop Thread.new puts "Hello"

这会产生无限数量的进程线程,这些线程由RVM 直接管理。即使您正在使用的RVM 中有Global Interpreter Lock...这仅意味着没有使用green threads,它们由操作系统本身提供...而是由进程本身处理。进程的 CPU 调度程序会毫不犹豫地运行每个Thread。在这个例子中,Thread 运行得非常快,然后就死掉了。

async 任务相比,使用了Fiber。那么在默认情况下发生了什么:

    进程开始。 Actor 已实例化。 方法调用调用循环。 循环调用async 方法。 async 方法将任务添加到邮箱。 邮箱未调用,循环继续。 另一个async 任务被添加到邮箱。 这将无限继续。

以上是因为循环方法本身是一个Fiber调用,它永远不会被挂起(除非sleep被调用!),因此添加到邮箱的附加任务永远不会调用一个新的@ 987654383@。 Fiber 的行为与 Thread 不同。这是讨论差异的很好的参考资料:

https://blog.engineyard.com/2010/concurrency-real-and-imagined-in-mri-threads

问题 #3:CelluloidCelluloid::ZMQ 的行为。

第三个问题是为什么include Celluloid 的行为不同于Celluloid::ZMQ ...

这是因为 Celluloid::ZMQ 使用基于反应器的事件邮箱,而 Celluloid 使用基于条件变量的邮箱。

阅读有关流水线和执行模式的更多信息:

https://github.com/celluloid/celluloid/wiki/Pipelining-and-execution-modes

这就是两个例子之间的区别。如果您对这些邮箱的行为有其他疑问,请随时在Google Group 上发帖……您面临的主要动态是GILFiberThreadThread 交互的独特性质.Reactor行为。

您可以在此处阅读有关反应器模式的更多信息:

http://en.wikipedia.org/wiki/Reactor_pattern Explanation of the "Reactor pattern" What is the difference between event driven model and reactor pattern?

并在此处查看Celluloid::ZMQ 使用的具体反应器:

https://github.com/celluloid/celluloid-zmq/blob/master/lib/celluloid/zmq/reactor.rb

那么在事件邮箱场景中发生的情况是,当sleep 被命中时,这是一个阻塞调用,这会导致反应器移动到邮箱中的下一个任务。

而且,这对于您的情况来说是独一无二的,Celluloid::ZMQ 使用的特定反应器正在使用一个永恒的 C 库......特别是 0MQ 库。该反应器在您的应用程序外部,其行为与 Celluloid::IOCelluloid 本身不同,这也是行为发生与您预期不同的原因。

多核支持替代方案

如果维护状态和范围对您来说并不重要,如果您使用不限于一个操作系统线程的jRubyRubinius,与使用具有Global Interpreter LockMRI 相比,您可以实例化更多不止一个演员,并在演员之间同时发出async 调用。

但我的拙见是使用非常高频率的计时器会更好地为您服务,例如我的示例中的0.0010.1,这对于所有意图和目的来说似乎都是即时的,但也允许演员线程有足够的时间来切换光纤并在邮箱中运行其他任务。

【讨论】:

真棒的答案。 @digitalextermist 很抱歉忙于我的盘子里的很多事情。在我接受你之前,你能给我解释一下。为什么?那么loop Thread.new puts 'running' 是否有效。 @digitalextermist 而且,如果有时间,您也想解释一下。 working.rb 和 not_working.rb @Viren 乍一看,要点是不同的,因为包括Celluloid::ZMQ 使用“事件”邮箱,而Celluloid 不使用。当我有时间时,我会评论这些不同但相关的问题。 @digitalextremist 不幸的是它没有。请参阅我的回答,了解我认为如何解决。【参考方案2】:

让我们做一个实验,稍微修改一下你的例子(我们修改它是因为这样我们会得到同样的“奇怪”行为,同时让事情更清晰):

class Indefinite
  include Celluloid

  def run!
    (1..100).each do |i|
      async.on_background i
    end
    puts "100 requests sent from #Actor.current.object_id"
  end 

  def on_background(num)
    (1..100000000).each 
    puts "message #num on #Actor.current.object_id" 
  end
end

Indefinite.new.run!
sleep

# =>
# 100 requests sent from 2084
# message 1 on 2084
# message 2 on 2084
# message 3 on 2084
# ...

您可以在任何 Ruby 解释器上运行它,使用 CelluloidCelluloid::ZMQ,结果总是将是相同的。另请注意,Actor.current.object_id 的输出在两种方法中是相同的,这为我们提供了线索,即我们在实验中处理的是单个参与者。

所以 ruby​​ 和赛璐珞的实现没有太大区别,只要关注这个实验。

让我们先解决为什么这段代码会这样?

不难理解为什么会这样。赛璐珞正在接收传入的请求并将它们保存在任务队列中以供适当的参与者使用。请注意,我们最初对 run! 的调用位于队列顶部。

然后赛璐珞处理这些任务,一次一个。如果恰好有阻塞调用或者sleep调用,根据documentation会调用下一个任务,而不是等待当前任务完成。

请注意,在我们的实验中没有阻塞调用。也就是说,run!方法会从头到尾执行,只有在执行完之后,每一个on_background调用才会以完美的顺序被调用。

这就是它应该如何工作的方式。

如果您在代码中添加sleep 调用,它将通知赛璐珞,它应该开始处理队列中的下一个任务。因此,您在第二个示例中的行为。

现在让我们继续如何设计系统,使其不依赖于sleep 调用,这至少很奇怪。

其实Celluloid-ZMQ project页面有一个很好的例子。注意这个循环:

def run
  loop  async.handle_message @socket.read 
end

它做的第一件事是@socket.read。请注意,这是一个阻塞操作。因此,赛璐珞将处理队列中的下一条消息(如果有的话)。只要@socket.read 响应,就会生成一个新任务。但是这个任务在@socket.read被再次调用之前不会被执行,从而阻塞执行,并通知赛璐珞处理队列中的下一个项目。

您可能会看到与您的示例不同的地方。你没有阻止任何东西,因此没有给赛璐珞一个处理队列的机会。

我们如何获得Celluloid::ZMQ 示例中给出的行为?

第一个(我认为更好的)解决方案是进行实际阻塞调用,例如@socket.read

如果您的代码中没有阻塞调用并且您仍然需要在后台处理事情,那么您应该考虑Celluloid提供的其他机制。

赛璐珞有多种选择。 可以使用conditions、futures、notifications,或者只是在低级别调用wait/signal,就像这个例子:

class Indefinite
  include Celluloid

  def run!
    loop do
      async.on_background
      result = wait(:background) #=> 33
    end
  end 

  def on_background
    puts "background" 

    # notifies waiters, that they can continue
    signal(:background, 33)
  end
end

Indefinite.new.run!
sleep

# ...
# background
# background
# background
# ...

使用sleep(0)Celluloid::ZMQ

我还注意到您在评论中提到的working.rb 文件。它包含以下循环:

loop  [1].each  |i|  async.handle_message 'hello'  ; sleep(0) 

看起来它正在做正确的工作。实际上,在jRuby 下运行它显示,它正在泄漏内存。为了使其更加明显,请尝试在 handle_message 正文中添加睡眠调用:

def handle_message(message)
  sleep 0.5
  puts "got message: #message"
end

高内存使用可能与队列填充非常快并且无法在给定时间内处理的事实有关。问题会更大,如果handle_message 工作量更大,那就是现在。

sleep 的解决方案

我对@9​​87654355@ 的解决方案持怀疑态度。它们可能需要大量内存,甚至会产生内存泄漏。而且不清楚应该将什么作为参数传递给sleep 方法以及为什么。

【讨论】:

您询问了使用多个参与者的问题,其中一些是:没有共享范围,整个线程上下文的开销大大增加,类似于第一个关于范围的问题 - 状态丢失。如果需要,我可以带更多回来。此外,还有一个主题是使用任意值来限制 OP 的无限循环,或者依赖阻塞......但如果按照我的建议使用计时器,则两者都不需要。 every 循环或after 块在这种情况下确实是最好的,使用简单的任务。不过,我在看celluloid 标签时感到很孤单,我珍惜时间,并认为你在回答中投入了思考。 另外,这里不建议使用signals(或者更好的是Conditions),因为a)当sleepOP示例中已经正常工作时,它会使事情进一步复杂化, 和 b) 现在代码中有一个返回锁和偶然性,而不是真正的async 处理。在这一点上,它也可能是一个阻塞方法调用,它否定了问题的全部意义。 include Celluloidinclude Celluloid::ZMQ 之间的区别的答案是基于事件/反应器的邮箱,而不是通常的邮箱;还有一个自定义反应器,与0MQ @digitalextremist 感谢您的 cmets。你是对的,创建无限数量的参与者(甚至将无限数量的任务添加到一个中)会创建不太理想的状态。所以我认为阻塞机制非常重要。我将测试您的睡眠解决方案在内存和任务数量方面的表现。对于我没有经验的眼睛来说,看起来记忆会很快充满,就像我的解决方案有多个演员一样。如果不是这样,我会相信睡眠也很好。 您的every 示例的最后一个问题是您在应用程序结束时没有出现sleep,因此它实际上只运行了一次......然后程序退出。跨度> @digitalextremist,是的,这就是问题所在。【参考方案3】:

线程如何与赛璐珞配合使用

赛璐珞不会为每个异步任务创建一个新线程。它有一个线程池,在其中运行每个任务,同步和异步任务。关键是该库将run! 函数视为同步任务,并在与异步任务相同的上下文中执行它。

默认情况下,赛璐珞在单个线程中运行所有内容,并使用队列系统为以后安排异步任务。它仅在需要时创建新线程。

除此之外,赛璐珞会覆盖sleep 函数。这意味着每次在扩展Celluloid 类的类中调用sleep 时,库都会检查其池中是否存在非睡眠线程。 在您的情况下,当您第一次调用 sleep 0.5 时,它将创建一个新线程来在第一个线程处于休眠状态时执行队列中的异步任务。

所以在您的第一个示例中,只有一个赛璐珞线程正在运行,执行循环。在您的第二个示例中,两个赛璐珞线程正在运行,第一个执行循环并在每次迭代时休眠,另一个执行后台任务。

例如,您可以更改第一个示例以执行有限次数的迭代:

def run! 
  (0..100).each do
    [1].each do |i|
      async.on_background
    end
  end
  puts "Done!"
end

当使用这个run! 函数时,您会看到Done! 打印在所有Running in background 之前,这意味着赛璐珞在启动同一线程中的异步任务之前完成了run! 函数的执行。

【讨论】:

知道为什么loop do; Thread.new puts 'Hello' end 会起作用吗? 我已经更深入地研究了赛璐珞的工作方式。我已经更新了我的答案,我认为它现在解释了为什么赛璐珞不同于带有新线程的循环。 不确定您从@haradwaith 获取信息的位置,但从 0.17.0 开始,默认情况下没有线程池,并且默认情况下使用光纤执行单个任务。解决此问题的最佳方法是使用everyafter,而不是使用sleep 和有限数量的async 任务。这些方法不能解决根本问题。 信息来自赛璐珞代码本身,我已阅读。赛璐珞使用光纤而不是线程这一事实并没有改变池的整体行为:在第一个示例中,它使用一根光纤来运行所有任务。问题不是问如何进行工作,因为答案已经在问题中:循环与sleep 一起工作,这里相当于everyafter。 OP 在问为什么这两个例子之间有这样的差异。 @bbozo 是正确的。我为赛璐珞编写了线程系统。实际发生的是默认情况下会生成和销毁线程/光纤。有一个线程组层,在此之上,目前有三类任务类:Fibered、Threaded、Fiber Pool。在我看来,Fiber Pool 任务处理程序是性能最高的,但解决问题所需的方法仍然是我列出的两种方法之一。此外,还有一个不同的线程组类选项,已被弃用:这就是您正在谈论的那个。这是一个容易犯的错误。我尊重你在赛璐珞上的时间投入。

以上是关于红宝石块内的赛璐珞异步不起作用的主要内容,如果未能解决你的问题,请参考以下文章

片段内的网格适配器不起作用

forEach循环内的XMLHTTPRequest不起作用

SpringBoot异步任务及Async不起作用的原因

ExpansionTile 内的 ListView 不起作用

如何覆盖红宝石块中的节点属性值/使用红宝石块中的变量值?

多行文本在 SwiftUI 列表内的 NavigationLink 中不起作用