使用 EventMachine 和 RabbitMQ 的 RPC

Posted

技术标签:

【中文标题】使用 EventMachine 和 RabbitMQ 的 RPC【英文标题】:RPC using EventMachine & RabbitMQ 【发布时间】:2012-11-02 21:53:17 【问题描述】:

我已经开始使用 AMQP gem doc 中提供的 RabbitMQ RPC sample code,尝试编写非常简单的代码来执行同步远程调用:

require "amqp"

module RPC
  class Base
    include EM::Deferrable

    def rabbit(rabbit_callback)
      rabbit_loop = Proc.new 
        AMQP.connect do |connection|
          AMQP::Channel.new(connection) do |channel|
            channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
              self.callback(&rabbit_callback)
              self.succeed(connection, channel, requests_queue)
            end # requests_queue
          end # AMQP.channel
        end # AMQP.connect

        Signal.trap("INT")   connection.close  EM.stop  
        Signal.trap("TERM")  connection.close  EM.stop  
      

      if !EM.reactor_running?
        EM.run do
          rabbit_loop.call
        end
      else
        rabbit_loop.call
      end
    end
  end

  class Server < Base

    def run
      server_loop = Proc.new do |connection, channel, requests_queue|
        consumer = AMQP::Consumer.new(channel, requests_queue).consume
        consumer.on_delivery do |metadata, payload|
          puts "[requests] Got a request #metadata.message_id. Sending a reply to #metadata.reply_to..."
          channel.default_exchange.publish(Time.now.to_s,
                                           :routing_key    => metadata.reply_to,
                                           :correlation_id => metadata.message_id,
                                           :mandatory      => true)
          metadata.ack
        end
      end
      rabbit(server_loop)
    end

  end

  class Client < Base

    def sync_push(request)
      result = nil
      sync_request = Proc.new do |connection, channel, requests_queue|
        message_id = Kernel.rand(10101010).to_s

        response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
        response_queue.subscribe do |headers, payload|
          if headers.correlation_id == message_id
            result = payload
            connection.close  EM.stop 
          end
        end

        EM.add_timer(0.1) do 
          puts "[request] Sending a request...#request with id #message_id"
          channel.default_exchange.publish(request,
                                           :routing_key => requests_queue.name,
                                           :reply_to    => response_queue.name,
                                           :message_id  => message_id)
        end
      end

      rabbit(sync_request)
      result
    end
  end
end

这个想法很简单:我想让一个消息队列随时准备好(这由rabbit 方法处理)。每当客户端想要发送请求时,它首先为响应创建一个临时队列以及一个消息 ID;然后它将请求发布到主消息队列,并在临时队列中等待具有相同消息 ID 的响应,以便知道此特定请求的响应何时准备好。我猜message_id 在某种程度上与临时队列是多余的(因为队列也应该是唯一的)。

我现在使用这个客户端/服务器代码运行虚拟脚本

# server session
>> server = RPC::Server.new
=> #<RPC::Server:0x007faaa23bb5b0>
>> server.run
Updating client properties
[requests] Got a request 3315740. Sending a reply to amq.gen-QCv8nP2dI5Qd6bg2Q1Xhk0...

# client session
>> client = RPC::Client.new
=> #<RPC::Client:0x007ffb6be6aed8>
>> client.sync_push "test 1"
Updating client properties
[request] Sending a request...test 1 with id 3315740
=> "2012-11-02 21:58:45 +0100"
>> client.sync_push "test 2"
AMQ::Client::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="\x002\x00\n\x00\x00\x00\f\x00\x00\x00\x00", @channel=1>

有两点我真的不明白:

    与 EventMachine 相关:在Client 代码中,如果我希望我的消息真正被发布,为什么我必须调用EM.add_timer?为什么使用EM.next_tick 不起作用?我的理解是,在这里调用发布时,“一切”都应该“准备就绪”。 与 AMQP 相关:为什么我的客户端由于第二个请求的连接关闭而崩溃?每次推送新请求时,都应该创建一个全新的 EM/AMQP 循环。

遗憾的是,在线处理 EM/AMQP 的代码很少,因此我们将不胜感激任何帮助! 任何有关此效率的评论也将不胜感激。

【问题讨论】:

【参考方案1】:

挖掘文档,我终于发现我实际上需要once_declared回调来确保当客户端开始使用它时队列已经准备好。

关于连接问题,似乎不知何故,使用EM::Deferrable 会导致问题,因此(非常不令人满意的)解决方案只是不包括EM::Deferrable

require "amqp"

module RPC

  module Base

    def rabbit(rabbit_callback)
      rabbit_loop = Proc.new 
        AMQP.start do |connection|
          AMQP::Channel.new(connection) do |channel|
            channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
              requests_queue.once_declared do
                rabbit_callback.call(connection, channel, requests_queue)
              end
            end
          end
        end

        Signal.trap("INT")   AMQP.stop  EM.stop  
        Signal.trap("TERM")  AMQP.stop  EM.stop  
      

      if !EM.reactor_running?
        @do_not_stop_reactor = false
        EM.run do
          rabbit_loop.call
        end
      else
        @do_not_stop_reactor = true
        rabbit_loop.call
      end
    end
  end

  class Server
    include Base

    def run
      server_loop = Proc.new do |connection, channel, requests_queue|
        consumer = AMQP::Consumer.new(channel, requests_queue).consume
        consumer.on_delivery do |metadata, payload|
          puts "[requests] Got a request #metadata.message_id. Sending a reply to #metadata.reply_to..."
          channel.default_exchange.publish(Time.now.to_s,
                                           :routing_key    => metadata.reply_to,
                                           :correlation_id => metadata.message_id,
                                           :mandatory      => true)
          metadata.ack
        end
      end
      rabbit(server_loop)
    end

  end

  class Client
    include Base

    def sync_push(request)
      result = nil
      sync_request = Proc.new do |connection, channel, requests_queue|
        message_id = Kernel.rand(10101010).to_s

        response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
        response_queue.subscribe do |headers, payload|
          if headers.correlation_id == message_id
            result = payload
            AMQP.stop  EM.stop unless @do_not_stop_reactor 
          end
        end

        response_queue.once_declared do
          puts "[request] Sending a request...#request with id #message_id"
          channel.default_exchange.publish(request,
                                           :routing_key => requests_queue.name,
                                           :reply_to    => response_queue.name,
                                           :message_id  => message_id)
        end
      end

      rabbit(sync_request)
      result
    end
  end
end

【讨论】:

以上是关于使用 EventMachine 和 RabbitMQ 的 RPC的主要内容,如果未能解决你的问题,请参考以下文章

EventMachine 和 em-websocket - 从队列中读取并推送到通道

使用 EventMachine 的 Rails 应用程序是不是可以进行长轮询?

EventMachine WebSockets - 订阅 WS 到 EM 频道与保持套接字收集

EventMachine/em-http-request 检测 http 流连接何时停止

安装eventmachine(1.0.3)时出错[重复]

如何在 EventMachine 实现中捕获异常?