在 Rails 线程中访问变量

Posted

技术标签:

【中文标题】在 Rails 线程中访问变量【英文标题】:Accessing a variable within a rails thread 【发布时间】:2016-07-19 16:46:30 【问题描述】:

我正在为基于网络的幻灯片放映构建一个应用程序,其中一个“主”用户可以在幻灯片之间移动,每个人的浏览器都会跟随。为此,我使用 websockets 和 Redis 作为全局通道来发送消息。每个连接的客户端都有存储在数组@clients 中的信息。 然后我有一个单独的线程用于订阅 Redis 通道,其中定义了一个“on.message”块,它应该向@clients 数组中的每个人发送一条消息,但该数组在这个块内是空的(不是空的模块中的任何其他位置)。

几乎遵循这个例子: https://devcenter.heroku.com/articles/ruby-websockets

相关代码,在自定义中间件类中:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []

    uri = URI.parse(ENV['REDISCLOUD_URL'])
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts @clients.count
          ### prints '0,' no clients receive msg
          @clients.each  |ws| ws.send(msg) 
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
    ws = Faye::WebSocket.new(env, nil, ping: KEEPALIVE_TIME)
  
    ws.on :open do |event|
      @clients << ws
      puts @clients.count
      ### prints actual number of clients
    end

    ws.on :message do |event|
      $redis.publish(CHANNEL, event.data)
    end

    ws.on :close do |event|
      @clients.delete(ws)
      ws = nil
    end

    ws.rack_response
  else
    @app.call(env)
  end
end
end

@clients 数组在新线程中访问时是否为空,因为实例变量未跨线程共享?如果是这样,我如何跨线程共享变量?

我也尝试过使用 $clients(全局变量,应该可以跨线程访问),但无济于事。

【问题讨论】:

@kfrz 这是 ruby​​,不是 python;为什么要在单独的线程中执行此操作?还有一个问题,你为什么要手动做这个?如果你使用 Rails 5.0,你可以使用actioncable,它可以解决你所有的问题。 看看这是否有帮助:***.com/questions/16538732/… @RaVeN Redis 代码阻塞。它永远不会返回控制权。因此,线程对其操作是必要的。 ActionCable 是一个不错的解决方案,如果所有客户端都是 javascript,则可以在这种情况下工作。但是,这个解决方案更通用,应该能够处理任何支持 WebSockets 的客户端。 【参考方案1】:

END UPDATED EDIT: 显示工作代码。除调试代码外,主模块未修改。注意:我确实遇到了我已经注意到的关于需要在终止前取消订阅的问题。

代码看起来正确。我想看看你是如何实例化它的。

在 config/application.rb 中,您可能至少有以下内容:

require 'ws_communication'
config.middleware.use WsCommunication

然后,在你的 JavaScript 客户端中,你应该有这样的东西:

var ws = new WebSocket(uri);

您是否实例化了另一个 WsCommunication 实例?这会将@clients 设置为一个空数组,并可能表现出您的症状。这样的事情是不正确的:

var ws = new WsCommunication;

如果您能向我们展示客户端,如果这篇文章没有帮助,也许还有 config/application.rb 会对我们有所帮助。

顺便说一句,我同意@clients 在任何更新时都应该受到互斥锁保护的评论,如果不是这样的话。它是一个动态结构,在事件驱动的系统中随时可能发生变化。 redis-mutex 是一个不错的选择。 (希望该链接是正确的,因为 Github 目前似乎在所有内容上都抛出了 500 个错误。)

您可能还注意到 $redis.publish 返回接收消息的客户端数量的整数值。

最后,您可能会发现您需要确保在终止之前取消订阅您的频道。我曾经遇到过多次甚至多次发送每条消息的情况,因为之前对同一频道的订阅没有被清理。由于您在线程中订阅频道,因此您需要在同一个线程中取消订阅,否则进程将“挂起”,等待正确的线程神奇地出现。我通过设置“取消订阅”标志然后发送消息来处理这种情况。然后,在 on.message 块中,我测试取消订阅标志并在那里发出取消订阅。

您提供的模块,仅进行了少量调试修改:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#@clients.count;"
          @clients.each  |ws| ws.send(msg) 
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, ping: KEEPALIVE_TIME)

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#@clients.count;"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#event.data; Receivers:#receivers;"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#@clients.count;"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

我提供的测试订阅者代码:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#WebSocket::Client::Simple::VERSION"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#ws.url)"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#e.inspect)"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#e.inspect)"
  end

end

我提供的测试发布者代码。 Publisher 和 Subscriber 可以轻松组合,因为这些只是测试:

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#WebSocket::Client::Simple::VERSION"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send("MESSAGE": "COUNT:#count;")
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#e.inspect)"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#e.inspect)"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : payload: message
    @ws.send(payload.to_json)
  end
end

在机架中间件层运行所有这些的示例 config.ru:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

这是主要的。我将它从我的运行版本中剥离出来,所以如果你使用它可能需要调整:

%w(rubygems bundler sinatra/base json erb).each  |m| require m 
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each  |file| require file 
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end

【讨论】:

嘿,感谢您提供所有这些信息!我会将其标记为答案,因为即使我的项目的需求发生了变化,并且我能够以不同的方式做我需要的事情,这也是非常有用的。【参考方案2】:

@client 应该在所有线程之间共享,您确定客户端不是从数组中意外删除的吗?尝试将“客户端已删除”放在 ws.on :close 块中并对其进行测试。 您也可以尝试使用以这种方式使用 @client 变量的互斥锁: http://ruby-doc.org/core-2.2.0/Mutex.html

【讨论】:

以上是关于在 Rails 线程中访问变量的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Rails ERB 文件的渲染部分 url 中访问 java 脚本变量?

Rails - 从视图中的索引操作访问实例变量属性

在 React on Rails 中访问 NODE_ENV 环境变量

如何访问 Rails escape_javascript 行中的 JavaScript 变量?

Rails Mailer - 无法从 Mailer 视图访问实例变量

如何在 Rails 中定义自定义配置变量