Rails:如何收听/从服务或队列中提取?
Posted
技术标签:
【中文标题】Rails:如何收听/从服务或队列中提取?【英文标题】:Rails: How to listen to / pull from service or queue? 【发布时间】:2016-07-04 23:31:20 【问题描述】:大多数 Rails 应用程序的工作方式是等待来自客户端的请求,然后施展魔法。 但是,如果我想将 Rails 应用程序用作微服务架构的一部分(例如)并进行一些异步通信(服务 A 将事件发送到 Kafka 或 RabbitMQ 队列,而服务 B - 我的 Rails 应用程序 - 应该监听这个队列),如何调整/启动 Rails 应用程序以立即侦听队列并由那里的事件触发? (意味着初始触发不是来自客户端,而是来自应用程序本身。)
感谢您的建议!
【问题讨论】:
rails 是一个网络框架。这就是它的作用,处理网络请求。如果您需要监控作业/事件队列(kafka 或其他),则需要使用其他东西。 @Sergio Tulentsev 你会推荐什么 - 只是一个手动 Ruby 脚本,或者其他一些 Ruby 框架,这对这种情况很有用,或者可能是其他语言? 真棒回复@SergioTulentsev 方式绝对没有添加任何价值。 【参考方案1】:我刚刚在我的应用程序中设置了 RabbitMQ 消息传递,并将在第二天左右为解耦(多个、分布式)应用程序实现。我发现this 文章很有帮助(还有RabbitMQ tutorials)。以下所有代码均适用于 RabbitMQ,并假设您在本地计算机上启动并运行了 RabbitMQ 服务器。
这是我目前所拥有的 - 这对我有用:
#Gemfile
gem 'bunny'
gem 'sneakers'
我有一个Publisher
发送到队列:
# app/agents/messaging/publisher.rb
module Messaging
class Publisher
class << self
def publish(args)
connection = Bunny.new
connection.start
channel = connection.create_channel
queue_name = "#args.keys.first.to_s.pluralize_queue"
queue = channel.queue(queue_name, durable: true)
channel.default_exchange.publish(args[args.keys.first].to_json, :routing_key => queue.name)
puts "in #self.#__method__, [x] Sent #args!"
connection.close
end
end
end
end
我是这样使用的:
Messaging::Publisher.publish(event: ... event details...)
然后我有我的“听众”:
# app/agents/messaging/events_queue_receiver.rb
require_dependency "#Rails.root.join('app','agents','messaging','events_agent')"
module Messaging
class EventsQueueReceiver
include Sneakers::Worker
from_queue :events_queue, env: nil
def work(msg)
logger.info msg
response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
ack! if response[:success]
end
end
end
'listener'将消息发送到Messaging::EventsAgent.distribute
,是这样的:
# app/agents/messaging/events_agent.rb
require_dependency #Rails.root.join('app','agents','fsm','state_assignment_agent')"
module Messaging
class EventsAgent
EVENT_HANDLERS =
enroll_in_program: ["FSM::StateAssignmentAgent"]
class << self
def publish(event)
Messaging::Publisher.publish(event: event)
end
def distribute(event)
puts "in #self.#__method__, message"
if event[:handler]
puts "in #self.#__method__, event[:handler: #event[:handler"
event[:handler].constantize.handle_event(event)
else
event_name = event[:event_name].to_sym
EVENT_HANDLERS[event_name].each do |handler|
event[:handler] = handler
publish(event)
end
end
return success: true
end
end
end
end
按照 Codetunes 上的说明,我有:
# Rakefile
# Add your own tasks in files placed in lib/tasks ending in .rake,
# for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.
require File.expand_path('../config/application', __FILE__)
require 'sneakers/tasks'
Rails.application.load_tasks
还有:
# app/config/sneakers.rb
Sneakers.configure()
Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy
我打开两个控制台窗口。首先,我说(让我的听众运行):
$ WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
... a bunch of start up info
2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2
第二个,我说:
$ rails s --sandbox
2.1.2 :001 > Messaging::Publisher.publish(:event=>:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1)
in Messaging::Publisher.publish, [x] Sent :event=>:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1!
=> :closed
然后,回到我的第一个窗口,我看到了:
2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: "event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1
in Messaging::EventsAgent.distribute, message
in Messaging::EventsAgent.distribute, event[:handler]: FSM::StateAssignmentAgent
在我的 RabbitMQ 服务器中,我看到了:
这是一个非常简单的设置,我相信在接下来的几天里我会学到更多东西。
祝你好运!
【讨论】:
【参考方案2】:恐怕至少对于 RabbitMQ,您将需要一个客户端。 RabbitMQ 实现了 AMQP 协议,而不是 Web 服务器使用的 HTTP 协议。正如 Sergio 上面提到的,Rails 是一个 Web 框架,因此它没有内置 AMQP 支持。您必须使用像 Bunny 这样的 AMQP 客户端才能从 Rails 应用程序中订阅 Rabbit 队列。
【讨论】:
当然 - 我知道 RabbitMQ 和 Rails 之间需要一些软件。但我的理解/希望是这个客户端可以被调用/可以在 Rails 应用程序中生活!?如果是这样,我将如何“触发”订阅过程? (然后还有:如何处理事件以便我可以在我的应用程序的 Rails 上下文中处理它们?) 可以在 Rails 应用程序中使用 RabbitMQ 客户端。您可以让 Rails 运行一个后台任务,该任务将维护队列订阅并处理传入消息。根据您告诉我的内容,听起来sneakers 库对您来说可能比 Bunny 更好。它适用于在后台进行队列处理的情况。【参考方案3】:假设服务 A 正在向 Kafka 队列发送一些事件,您可以在 Rails 应用程序中运行一个后台进程,该进程将查找 kafka 队列并处理这些排队的消息。对于后台进程,您可以使用 cron-job 或 sidekiq 之类的东西。
【讨论】:
【参考方案4】:Rails 有很多东西。它的一部分处理网络请求。其他部分(ActiveRecord)不关心您是网络请求还是脚本或其他。 Rails 本身甚至没有配备可用于生产的 Web 服务器,您可以使用其他 gem(例如,thin
用于普通的旧 Web 浏览器,或 wash_out
用于传入的 SOAP 请求)。 Rails 只为您提供基础架构/中间件来组合有关服务器的所有部分。
除非您的队列能够以某种 HTTP 方式调用您的应用程序,例如以 SOAP 请求的形式,否则您将需要能够监听您的队列系统(无论是什么)并翻译新“票证”的东西在您的队列中进入 Rails 世界中的控制器操作。
【讨论】:
以上是关于Rails:如何收听/从服务或队列中提取?的主要内容,如果未能解决你的问题,请参考以下文章
Apache Flume - 由多个使用者从单个消息队列中提取数据