从内部重新排队或评估延迟的工作?
Posted
技术标签:
【中文标题】从内部重新排队或评估延迟的工作?【英文标题】:Requeue or evaluate delayed job from inside? 【发布时间】:2014-05-20 17:26:50 【问题描述】:有没有办法从作业任务本身内部确定正在运行的delayed_job 作业的状态?我有一项与可能非常不稳定的服务交互的作业,对于某类连接失败,我想重新排队该作业,并且仅在连接失败在重试限制时再次发生时引发异常。
用于演示我想要做什么的伪代码:
def do_thing
service.send_stuff(args)
rescue Exception1, Exception2
if job.retries == JOBS_MAX
raise
else
job.requeue
end
end
我不想在任何失败时引发异常,因为通常该工作会在以后重试时正常完成,而这只是在为我制造噪音。不过,我确实想知道它是否永远不会完成。
【问题讨论】:
失败后DJ不会自动重新排队吗? 是的,但我知道导致基于失败的重新排队的唯一方法是引发异常 - 我会收到通知。我只希望在最后一次重试时发生这种情况。 【参考方案1】:为 DJ 定义一个自定义作业,为max_attempts
设置一个数字,为error
回调设置行为。这是未经测试的,但它可能看起来像这样:
class DoThingJob
def max_attempts; @max_attempts ||= 5; end
def error(job, exception)
case exception
when Exception1, Exception2
# will be requeued automatically until max_attempts is reached
# can add extra log message here if desired
else
@max_attempts = job.attempts
# this will cause DJ to fail the job and not try again
end
end
end
注意
我在@pdobb 发布他的答案之前就开始写这篇文章了。无论如何我都会发布它,因为它提供了有关如何处理异常和重新排队逻辑的更多详细信息。
【讨论】:
【参考方案2】:正如您所说,如果延迟作业运行器到达perform
队列的末尾,那么它将被视为成功运行并从队列中删除。所以你只需要阻止它到达终点。没有重新排队——即使有它会是一个具有新属性的新记录。因此,您可能会重新考虑导致作业通知您异常的原因。例如,您可以添加一个通知您的条件...
可能的解决方案
您可以使用Delayed::Worker.max_attempts
获取默认的JOBS_MAX
(如您对其进行伪编码),或者您可以通过定义一个方法来设置您自己的每个作业,例如:max_attempts
。
# Fail permanently after the 10th failure for this job
def max_attempts
10
end
也就是说,在以下情况下,此方法将可用:
您也可以使用callback hooks。如果已定义,延迟作业将通过 error
方法回调到您的有效负载对象。因此,您可以使用error
方法来通知您超出给定尝试次数的实际异常。为此...
在回调中,Delayed::Job 对象本身作为第一个参数返回:
def error(job, exception)
job.attempts # gives you the current attempt number
# If job.attempts is greater than max_attempts then send exception notification
# or whatever you want here...
end
因此,您可以使用回调开始添加关于何时通知自己以及何时不通知的逻辑。我什至可能会建议制作一组基本功能,您可以将这些功能包含在所有有效负载对象中以执行这些操作……但这取决于您和您的设计。
【讨论】:
以上是关于从内部重新排队或评估延迟的工作?的主要内容,如果未能解决你的问题,请参考以下文章
如果出现问题并且我想重试,使用同一个 Executor 重新排队 Runnable 是不是安全或可取?