Rails:如何收听/从服务或队列中提取?

Posted

技术标签:

【中文标题】Rails:如何收听/从服务或队列中提取?【英文标题】:Rails: How to listen to / pull from service or queue? 【发布时间】:2016-07-04 23:31:20 【问题描述】:

大多数 Rails 应用程序的工作方式是等待来自客户端的请求,然后施展魔法。 但是,如果我想将 Rails 应用程序用作微服务架构的一部分(例如)并进行一些异步通信(服务 A 将事件发送到 Kafka 或 RabbitMQ 队列,而服务 B - 我的 Rails 应用程序 - 应该监听这个队列),如何调整/启动 Rails 应用程序以立即侦听队列并由那里的事件触发? (意味着初始触发不是来自客户端,而是来自应用程序本身。)

感谢您的建议!

【问题讨论】:

rails 是一个网络框架。这就是它的作用,处理网络请求。如果您需要监控作业/事件队列(kafka 或其他),则需要使用其他东西。 @Sergio Tulentsev 你会推荐什么 - 只是一个手动 Ruby 脚本,或者其他一些 Ruby 框架,这对这种情况很有用,或者可能是其他语言? 真棒回复@SergioTulentsev 方式绝对没有添加任何价值。 【参考方案1】:

我刚刚在我的应用程序中设置了 RabbitMQ 消息传递,并将在第二天左右为解耦(多个、分布式)应用程序实现。我发现this 文章很有帮助(还有RabbitMQ tutorials)。以下所有代码均适用于 RabbitMQ,并假设您在本地计算机上启动并运行了 RabbitMQ 服务器。

这是我目前所拥有的 - 这对我有用:

  #Gemfile
  gem 'bunny'
  gem 'sneakers'

我有一个Publisher 发送到队列:

  # app/agents/messaging/publisher.rb
  module Messaging
    class Publisher
      class << self

        def publish(args)
          connection = Bunny.new
          connection.start
          channel = connection.create_channel
          queue_name = "#args.keys.first.to_s.pluralize_queue"
          queue = channel.queue(queue_name, durable: true)
          channel.default_exchange.publish(args[args.keys.first].to_json, :routing_key => queue.name)
          puts "in #self.#__method__, [x] Sent #args!"
          connection.close
        end

      end
    end
  end

我是这样使用的:

  Messaging::Publisher.publish(event: ... event details...)

然后我有我的“听众”:

  # app/agents/messaging/events_queue_receiver.rb
  require_dependency "#Rails.root.join('app','agents','messaging','events_agent')"

  module Messaging
    class EventsQueueReceiver
      include Sneakers::Worker
      from_queue :events_queue, env: nil

      def work(msg)
        logger.info msg
        response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
        ack! if response[:success]
      end

    end
  end

'listener'将消息发送到Messaging::EventsAgent.distribute,是这样的:

  # app/agents/messaging/events_agent.rb
 require_dependency  #Rails.root.join('app','agents','fsm','state_assignment_agent')"

  module Messaging
    class EventsAgent
      EVENT_HANDLERS = 
        enroll_in_program: ["FSM::StateAssignmentAgent"]
      
      class << self

        def publish(event)
          Messaging::Publisher.publish(event: event)
        end

        def distribute(event)
          puts "in #self.#__method__, message"
          if event[:handler]
            puts "in #self.#__method__, event[:handler: #event[:handler"
            event[:handler].constantize.handle_event(event)
          else
            event_name = event[:event_name].to_sym
            EVENT_HANDLERS[event_name].each do |handler|
              event[:handler] = handler
              publish(event)
            end
          end
          return success: true
        end

      end
    end
  end

按照 Codetunes 上的说明,我有:

  # Rakefile
  # Add your own tasks in files placed in lib/tasks ending in .rake,
  # for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.

  require File.expand_path('../config/application', __FILE__)

  require 'sneakers/tasks'
  Rails.application.load_tasks

还有:

  # app/config/sneakers.rb
  Sneakers.configure()
  Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy

我打开两个控制台窗口。首先,我说(让我的听众运行):

  $ WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
  ... a bunch of start up info
  2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2

第二个,我说:

  $ rails s --sandbox
  2.1.2 :001 > Messaging::Publisher.publish(:event=>:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1)
  in Messaging::Publisher.publish, [x] Sent :event=>:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1!
  => :closed 

然后,回到我的第一个窗口,我看到了:

  2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: "event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1
  in Messaging::EventsAgent.distribute, message
  in Messaging::EventsAgent.distribute, event[:handler]: FSM::StateAssignmentAgent

在我的 RabbitMQ 服务器中,我看到了:

这是一个非常简单的设置,我相信在接下来的几天里我会学到更多东西。

祝你好运!

【讨论】:

【参考方案2】:

恐怕至少对于 RabbitMQ,您将需要一个客户端。 RabbitMQ 实现了 AMQP 协议,而不是 Web 服务器使用的 HTTP 协议。正如 Sergio 上面提到的,Rails 是一个 Web 框架,因此它没有内置 AMQP 支持。您必须使用像 Bunny 这样的 AMQP 客户端才能从 Rails 应用程序中订阅 Rabbit 队列。

【讨论】:

当然 - 我知道 RabbitMQ 和 Rails 之间需要一些软件。但我的理解/希望是这个客户端可以被调用/可以在 Rails 应用程序中生活!?如果是这样,我将如何“触发”订阅过程? (然后还有:如何处理事件以便我可以在我的应用程序的 Rails 上下文中处理它们?) 可以在 Rails 应用程序中使用 RabbitMQ 客户端。您可以让 Rails 运行一个后台任务,该任务将维护队列订阅并处理传入消息。根据您告诉我的内容,听起来sneakers 库对您来说可能比 Bunny 更好。它适用于在后台进行队列处理的情况。【参考方案3】:

假设服务 A 正在向 Kafka 队列发送一些事件,您可以在 Rails 应用程序中运行一个后台进程,该进程将查找 kafka 队列并处理这些排队的消息。对于后台进程,您可以使用 cron-job 或 sidekiq 之类的东西。

【讨论】:

【参考方案4】:

Rails 有很多东西。它的一部分处理网络请求。其他部分(ActiveRecord)不关心您是网络请求还是脚本或其他。 Rails 本身甚至没有配备可用于生产的 Web 服务器,您可以使用其他 gem(例如,thin 用于普通的旧 Web 浏览器,或 wash_out 用于传入的 SOAP 请求)。 Rails 只为您提供基础架构/中间件来组合有关服务器的所有部分。

除非您的队列能够以某种 HTTP 方式调用您的应用程序,例如以 SOAP 请求的形式,否则您将需要能够监听您的队列系统(无论是什么)并翻译新“票证”的东西在您的队列中进入 Rails 世界中的控制器操作。

【讨论】:

以上是关于Rails:如何收听/从服务或队列中提取?的主要内容,如果未能解决你的问题,请参考以下文章

如何从音频队列缓冲区中提取整数样本并将修改后的样本写回?

关闭浏览器时如何在队列中执行非常繁重的任务(在后台运行)?

Apache Flume - 由多个使用者从单个消息队列中提取数据

Android:intentservice,如何中止或跳过handleintent队列中的任务

如何查看延迟的作业队列?

延时队列常用实现详解