Redis + ActionController::Live 线程不会死
Posted
技术标签:
【中文标题】Redis + ActionController::Live 线程不会死【英文标题】:Redis + ActionController::Live threads not dying 【发布时间】:2013-09-29 00:15:44 【问题描述】:背景:我们已经在我们现有的 Rails 应用程序之一中内置了聊天功能。我们正在使用新的ActionController::Live
模块并运行 Puma(在生产中使用 nginx),并通过 Redis 订阅消息。我们正在使用EventSource
客户端异步建立连接。
问题摘要: 连接终止时线程永远不会死亡。
例如,如果用户离开、关闭浏览器,甚至转到应用程序中的不同页面,则会生成一个新线程(如预期的那样),但旧线程继续存在。
我目前看到的问题是,当任何这些情况发生时,服务器无法知道浏览器端的连接是否终止,直到有东西试图写入这个损坏的流,这永远不会发生一旦浏览器离开原始页面。
这个问题似乎记录在on github,类似的问题在*** here (pretty well exact same question) 和here (regarding getting number of active threads) 上提出。
根据这些帖子,我能够提出的唯一解决方案是实现一种线程/连接扑克。尝试写入断开的连接会生成一个IOError
,我可以捕获并正确关闭连接,从而使线程死亡。这是该解决方案的控制器代码:
def events
response.headers["Content-Type"] = "text/event-stream"
stream_error = false; # used by flusher thread to determine when to stop
redis = Redis.new
# Subscribe to our events
redis.subscribe("message.create", "message.user_list_update") do |on|
on.message do |event, data| # when message is received, write to stream
response.stream.write("messageType: '#event', data: #data\n\n")
end
# This is the monitor / connection poker thread
# Periodically poke the connection by attempting to write to the stream
flusher_thread = Thread.new do
while !stream_error
$redis.publish "message.create", "flusher_test"
sleep 2.seconds
end
end
end
rescue IOError
logger.info "Stream closed"
stream_error = true;
ensure
logger.info "Events action is quitting redis and closing stream!"
redis.quit
response.stream.close
end
(注意:events
方法似乎在 subscribe
方法调用时被阻止。其他一切(流式传输)都正常工作,所以我认为这是正常的。)
(其他说明:flusher 线程概念作为单个长时间运行的后台进程更有意义,有点像垃圾线程收集器。我上面实现的问题是为每个连接生成一个新线程,即毫无意义。任何试图实现这个概念的人都应该更像一个单一的进程,而不是像我所概述的那样。当我成功地将它重新实现为一个单一的后台进程时,我会更新这篇文章。)
这个解决方案的缺点是我们只是延迟或减少了问题,并没有完全解决它。除了 ajax 等其他请求外,我们每个用户仍然有 2 个线程,从扩展的角度来看,这似乎很糟糕;对于具有许多可能的并发连接的大型系统来说,这似乎是完全无法实现和不切实际的。
我觉得我错过了一些重要的东西;如果没有像我一样实现自定义连接检查器,我觉得有点难以相信 Rails 有一个明显被破坏的特性。
问题:我们如何在不实现诸如“连接扑克”或垃圾线程收集器之类的老套东西的情况下允许连接/线程终止?
如往常一样,如果我遗漏了什么,请告诉我。
更新
只是添加一些额外的信息:Huetsch 在 github 上发布 this comment 指出 SSE 基于 TCP,它通常在连接关闭时发送一个 FIN 数据包,让另一端(在这种情况下为服务器)知道关闭连接是安全的。 Huetsch 指出浏览器没有发送该数据包(可能是EventSource
库中的错误?),或者Rails 没有捕获它或对其进行任何处理(如果是这种情况,肯定是Rails 中的错误)。搜索继续……
另一个更新 使用 Wireshark,我确实可以看到正在发送的 FIN 数据包。诚然,我对协议级别的东西不是很了解或经验丰富,但是据我所知,当我使用来自浏览器的 EventSource 建立 SSE 连接时,我肯定检测到从浏览器发送的 FIN 数据包,如果我没有发送数据包删除该连接(意味着没有 SSE)。尽管我对 TCP 的了解并不十分了解,但这似乎向我表明客户端确实正确终止了连接;也许这表明 Puma 或 Rails 中存在错误。
又一次更新
@JamesBoutcher / boutcheratwest(github) 向我指出了 discussion on the redis website regarding 这个问题,特别是关于 .(p)subscribe
方法永远不会关闭的事实。该网站上的发帖人指出了与我们在这里发现的相同的事情,即当客户端连接关闭时,Rails 环境永远不会收到通知,因此无法执行.(p)unsubscribe
方法。他询问.(p)subscribe
方法的超时,我认为这也可以,但我不确定哪种方法(我上面描述的连接扑克,或他的超时建议)会是更好的解决方案。理想情况下,对于连接扑克解决方案,我想找到一种方法来确定连接是否在另一端关闭而不写入流。正如你所看到的,我必须实现客户端代码来分别处理我的“戳”消息,我认为这很突兀和愚蠢。
【问题讨论】:
另外,我知道我可以使用psubscribe
来匹配message.*
的任何内容;我最初将这两个消息键作为单独的功能,最近才拆分它们。还没有使用psubscribe
,但是嗯。
我有同样的问题,并且已经排除了丢失的 FIN 数据包(我相信)......我让 Apache 以代理模式坐在 Rails 服务器(Puma)前面,并且可以杀死 Apache——Puma 中的线程不会死。
解决方案不能很好地适应多个用户...您只需要一个额外的线程来生成这些 flusher_test 消息,而不是每个用户一个,对吧?
@JamesBoutcher 不,这是正确的,它真的不应该是每个用户一个。我这样做主要是为了测试这个概念,但我在解决方案下方的“其他说明”部分中写道,它实际上应该被实现为单个线程,因此它的行为就像一个垃圾收集器(用于线程)。我还没有以这种方式重构解决方案,但是当我这样做时,我会发布更新。 github 上的 Huetsch 说他是通过 cron 工作完成的,我认为这也可以。我假设他的 cron 定期向 Rails 应用程序发出请求,该应用程序发送 $redis.publish
消息或类似的东西。
@PaulRichter 你有没有想出一个优雅的解决方案?
【参考方案1】:
如果您可以容忍丢失消息的可能性很小,您可以使用subscribe_with_timeout
:
sse = SSE.new(response.stream)
sse.write("hi", event: "hello")
redis = Redis.new(reconnect_attempts: 0)
loop do
begin
redis.subscribe_with_timeout(5 * 60, 'mycoolchannel') do |on|
on.message do |channel, message|
sse.write(message, event: 'message_posted')
end
end
rescue Redis::TimeoutError
sse.write("ping", event: "ping")
end
end
此代码订阅 Redis 频道,等待 5 分钟,然后关闭与 Redis 的连接并再次订阅。
【讨论】:
【参考方案2】:与其向所有客户端发送心跳,不如为每个连接设置一个看门狗可能更容易。 [感谢@NeilJewers]
class Stream::FixedController < StreamController
def events
# Rails reserve a db connection from connection pool for
# each request, lets put it back into connection pool.
ActiveRecord::Base.clear_active_connections!
redis = Redis.new
watchdog = Doberman::WatchDog.new(:timeout => 20.seconds)
watchdog.start
# Redis (p)subscribe is blocking request so we need do some trick
# to prevent it freeze request forever.
redis.psubscribe("messages:*") do |on|
on.pmessage do |pattern, event, data|
begin
# write to stream - even heartbeat - it's sometimes chance to
response.stream.write("event: #event\ndata: #data\n\n")
watchdog.ping
rescue Doberman::WatchDog::Timeout => e
raise ClientDisconnected if response.stream.closed?
watchdog.ping
end
end
end
rescue IOError
rescue ClientDisconnected
ensure
response.stream.close
redis.quit
watchdog.stop
end
end
【讨论】:
【参考方案3】:我目前正在制作一个围绕 ActionController:Live、EventSource 和 Puma 的应用程序,对于那些遇到关闭流等问题的应用程序,而不是拯救 IOError
,在 Rails 4.2 中你需要拯救 ClientDisconnected
.示例:
def stream
#Begin is not required
twitter_client = Twitter::Streaming::Client.new(config_params) do |obj|
# Do something
end
rescue ClientDisconnected
# Do something when disconnected
ensure
# Do something else to ensure the stream is closed
end
我从这个论坛帖子中找到了这个方便的提示(一直在底部):http://railscasts.com/episodes/401-actioncontroller-live?view=comments
【讨论】:
哇,这很有希望。我目前无法对此进行测试,但是您是说每当客户端断开连接时都会触发此救援块,而不需要任何类型的“心跳”线程?我在引发此异常的 Rails 源代码中找到了portion of code,除非我误解它的工作原理,否则它似乎只会在尝试写入时引发该异常,所以我有点不清楚。 一个多月后...我认为您是对的,它仅在尝试写入时才引发异常,但在我的情况下,我每秒向连接的客户端流式传输大约 50 条推文,因此控制器与客户端基本同时断开连接。我最终改用了 Websocket-Rails,经过长时间的战斗,它可以很好地工作。 好吧,很公平。感谢更新。如果/当我回到这个项目时,我会考虑结帐 websocket-rails。【参考方案4】:这是一个超时解决方案,它将退出阻塞 Redis。(p)subscribe 调用并终止未使用的连接进程。
class Stream::FixedController < StreamController
def events
# Rails reserve a db connection from connection pool for
# each request, lets put it back into connection pool.
ActiveRecord::Base.clear_active_connections!
# Last time of any (except heartbeat) activity on stream
# it mean last time of any message was send from server to client
# or time of setting new connection
@last_active = Time.zone.now
# Redis (p)subscribe is blocking request so we need do some trick
# to prevent it freeze request forever.
redis.psubscribe("messages:*", 'heartbeat') do |on|
on.pmessage do |pattern, event, data|
# capture heartbeat from Redis pub/sub
if event == 'heartbeat'
# calculate idle time (in secounds) for this stream connection
idle_time = (Time.zone.now - @last_active).to_i
# Now we need to relase connection with Redis.(p)subscribe
# chanel to allow go of any Exception (like connection closed)
if idle_time > 4.minutes
# unsubscribe from Redis because of idle time was to long
# that's all - fix in (almost)one line :)
redis.punsubscribe
end
else
# save time of this (last) activity
@last_active = Time.zone.now
end
# write to stream - even heartbeat - it's sometimes chance to
# capture dissconection error before idle_time
response.stream.write("event: #event\ndata: #data\n\n")
end
end
# blicking end (no chance to get below this line without unsubscribe)
rescue IOError
Logs::Stream.info "Stream closed"
rescue ClientDisconnected
Logs::Stream.info "ClientDisconnected"
rescue ActionController::Live::ClientDisconnected
Logs::Stream.info "Live::ClientDisconnected"
ensure
Logs::Stream.info "Stream ensure close"
redis.quit
response.stream.close
end
end
你必须使用 reds.(p)unsubscribe 来结束这个阻塞调用。没有例外可以打破这一点。
我的简单应用程序包含有关此修复的信息:https://github.com/piotr-kedziak/redis-subscribe-stream-puma-fix
【讨论】:
【参考方案5】:在@James Boucher 的基础上,我在集群 Puma 中使用了以下内容和 2 个工作人员,因此我在 config/initializers/redis.rb 中只为心跳创建了 1 个线程:
config/puma.rb
on_worker_boot do |index|
puts "worker nb #index.to_s booting"
create_heartbeat if index.to_i==0
end
def create_heartbeat
puts "creating heartbeat"
$redis||=Redis.new
heartbeat = Thread.new do
ActiveRecord::Base.connection_pool.release_connection
begin
while true
hash=event: "heartbeat",data: "heartbeat"
$redis.publish("heartbeat",hash.to_json)
sleep 20.seconds
end
ensure
#no db connection anyway
end
end
end
【讨论】:
【参考方案6】:这是一个可能更简单的解决方案,它不使用心跳。经过大量研究和实验,这是我与 sinatra + sinatra sse gem 一起使用的代码(应该很容易适应 Rails 4):
class EventServer < Sinatra::Base
include Sinatra::SSE
set :connections, []
.
.
.
get '/channel/:channel' do
.
.
.
sse_stream do |out|
settings.connections << out
out.callback
puts 'Client disconnected from sse';
settings.connections.delete(out);
redis.subscribe(channel) do |on|
on.subscribe do |channel, subscriptions|
puts "Subscribed to redis ##channel\n"
end
on.message do |channel, message|
puts "Message from redis ##channel: #message\n"
message = JSON.parse(message)
.
.
.
if settings.connections.include?(out)
out.push(message)
else
puts 'closing orphaned redis connection'
redis.unsubscribe
end
end
end
end
end
redis 连接阻塞 on.message 并且只接受 (p)subscribe/(p)unsubscribe 命令。取消订阅后,redis 连接将不再被阻止,并且可以由初始 sse 请求实例化的 Web 服务器对象释放。当您在 redis 上收到消息并且与浏览器的 sse 连接不再存在于集合数组中时,它会自动清除。
【讨论】:
【参考方案7】:我刚刚做的一个解决方案(从 @teeg 借了很多东西)似乎可以正常工作(还没有测试过失败)
config/initializers/redis.rb
$redis = Redis.new(:host => "xxxx.com", :port => 6379)
heartbeat_thread = Thread.new do
while true
$redis.publish("heartbeat","thump")
sleep 30.seconds
end
end
at_exit do
# not sure this is needed, but just in case
heartbeat_thread.kill
$redis.quit
end
然后在我的控制器中:
def events
response.headers["Content-Type"] = "text/event-stream"
redis = Redis.new(:host => "xxxxxxx.com", :port => 6379)
logger.info "New stream starting, connecting to redis"
redis.subscribe(['parse.new','heartbeat']) do |on|
on.message do |event, data|
if event == 'parse.new'
response.stream.write("event: parse\ndata: #data\n\n")
elsif event == 'heartbeat'
response.stream.write("event: heartbeat\ndata: heartbeat\n\n")
end
end
end
rescue IOError
logger.info "Stream closed"
ensure
logger.info "Stopping stream thread"
redis.quit
response.stream.close
end
【讨论】:
如果客户端在“parse.new”事件之前得到“heartbeat”事件怎么办? 为什么要以固定的时间间隔向数千个客户端发送心跳和 ping?抱歉,任何没有心跳的解决方案都会更好。以上是关于Redis + ActionController::Live 线程不会死的主要内容,如果未能解决你的问题,请参考以下文章
ActionController::InvalidAuthenticityToken (ActionController::InvalidAuthenticityToken):将 JSON 参数发布到
设计中的 ActionController::UrlGenerationError::Registrations#create
ActionController::UrlGenerationError,没有路由匹配
Rails 6 - 常量 ActionController::InvalidAuthenticityToken