[源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat

Posted 格格巫 MMQ!!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat相关的知识,希望对你有一定的参考价值。

目录
[源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat
0x00 摘要
0x01 Blueprint
0x02 Timer Step
2.1 Transport
2.2 Thread-less VS Thread-based
0x03 Timer in Pool
3.1 gevent 和 eventlet
3.2 BasePool
0x04 kombu.Timer
4.1 异步
4.2 调用
4.2.1 添加 timer function
4.2.2 调用
4.3 实验
4.3.1 示例代码
4.3.2 Hub 的使用
0x05 timer2
0x06 Heart
6.1 Heart in Bootstep
6.2 Heart in Consumer
6.3 worker-online
6.4 worker-offline
6.5 发送心跳
0xEE 个人信息
0xFF 参考
0x00 摘要
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

之前我们用了十几篇文章,介绍了 Kombu 和 Celery 的基础功能。从本文开始,我们介绍 Celery 的一些辅助功能(比如负载均衡,容错等等)。其实从某种意义上来说,这些辅助功能更加重要。

本文我们介绍 Timer 和 Heart 这两个组件。大家可以看看底层设计是如何影响上层实现的。

[源码分析] 消息队列 Kombu 之 mailbox

[源码分析] 消息队列 Kombu 之 Hub

[源码分析] 消息队列 Kombu 之 Consumer

[源码分析] 消息队列 Kombu 之 Producer

[源码分析] 消息队列 Kombu 之 启动过程

[源码解析] 消息队列 Kombu 之 基本架构

[源码解析] 并行分布式框架 Celery 之架构 (1)

[源码解析] 并行分布式框架 Celery 之架构 (2)

[源码解析] 并行分布式框架 Celery 之 worker 启动 (1)

[源码解析] 并行分布式框架 Celery 之 worker 启动 (2)

[源码解析] 分布式任务队列 Celery 之启动 Consumer

[源码解析] 并行分布式任务队列 Celery 之 Task是什么

[从源码学设计]celery 之 发送Task & AMQP

[源码解析] 并行分布式任务队列 Celery 之 消费动态流程

[源码解析] 并行分布式任务队列 Celery 之 多进程模型

[源码分析] 分布式任务队列 Celery 多线程模型 之 子进程

[源码分析]并行分布式任务队列 Celery 之 子进程处理消息

0x01 Blueprint
Celery 的 Worker初始化过程中,其内部各个子模块的执行顺序是由一个BluePrint类定义,并且根据各个模块之间的依赖进行排序(实际上把这种依赖关系组织成了一个 DAG)执行。

Celery worker 的 Blueprint 如下,我们可以看到 Timer,Hub 是 Celery Worker 的两个基本组件,提到 hub 是因为后面讲解需要用到。

class Blueprint(bootsteps.Blueprint):
“”“Worker bootstep blueprint.”""

name = 'Worker'
default_steps = {
    'celery.worker.components:Hub', # 这里是 Hub
    'celery.worker.components:Pool',
    'celery.worker.components:Beat',
    'celery.worker.components:Timer', # 这里是 Timer
    'celery.worker.components:StateDB',
    'celery.worker.components:Consumer',
    'celery.worker.autoscale:WorkerComponent',
}

0x02 Timer Step
我们首先来到 Timer Step。

从 Timer 组件 的定义中可以看到,Timer 组件 会根据当前worker是否使用事件循环机制来决定创建什么类型的timer。

如果使用 eventloop,则使用kombu.asynchronous.timer.Timer as _Timer,这里具体等待动作由用户自己完成。
否则使用 Pool 内部的Timer类(就是 timer_cls=‘celery.utils.timer2.Timer’),timer2 自己做了一个线程来做定时等待;
定义如下:

from kombu.asynchronous.timer import Timer as _Timer

class Timer(bootsteps.Step):
“”“Timer bootstep.”""

def create(self, w):
    if w.use_eventloop:                        # 检查传入的Worker是否使用了use_eventloop
        # does not use dedicated timer thread.
        w.timer = _Timer(max_interval=10.0)    # 直接使用kombu的timer做定时器
    else:
        if not w.timer_cls:                     # 如果配置文件中没有配置timer_clas
            # Default Timer is set by the pool, as for example, the
            # eventlet pool needs a custom timer implementation.
            w.timer_cls = w.pool_cls.Timer      # 使用缓冲池中的Timer
        w.timer = self.instantiate(w.timer_cls,
                                   max_interval=w.timer_precision,
                                   on_error=self.on_timer_error,
                                   on_tick=self.on_timer_tick)  # 导入对应的类并实例化

起初看代码时候很奇怪,为什么要再单独定义一个 timer2?

原因推断是(因为对 Celery 的版本发展历史不清楚,所以此处不甚确定,希望有同学可以指正):依据 底层 Transport 的设计来对 Timer 做具体实现调整。

2.1 Transport
大家知道,Celery 是依赖于 Kombu,而在 Kombu 体系中,用 transport 对所有的 broker 进行了抽象,为不同的 broker 提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。

我们再回顾下具体 Kombu 的概念:

Connection 是 AMQP 对 连接的封装;
Channel 是 AMQP 对 MQ 操作的封装;
那么两者的关系就是对 MQ 的操作(Channel)必然离不开连接(Connection),但是 Kombu 并不直接让 Channel 使用 Connection 来发送 / 接受请求,而是引入了一个新的抽象 Transport。Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行;

Transport 代表真实的 MQ 连接,也是真正连接到 MQ( redis / rabbitmq )的实例。就是存储和发送消息的实体,用来区分底层消息队列是用 amqp、Redis 还是其它实现的。

具体 Kombu 逻辑如下图,Transport 在左下角处 :

在这里插入图片描述

2.2 Thread-less VS Thread-based
对于 Transport,某些 rate-limit implementation(比如 RabbitMQ / Redis ) 为了减少开销,采用了event-loop(底层使用了 Epoll),是 thread-less and lock-free。

而其他旧类型的 Transport 就是 Thread based,比如 Mongo。因此,

对于 Thread-less Transport

Kombu 就采用了 kombu.asynchronous.timer.Timer as _Timer,具体等待操作是在 event-loop 中实现,就是 调用者 自己会做等待。

具体比如在 Redis Transport 之中,就有 register_with_event_loop 函数用来在 loop(就是 event-loop)中注册自己,具体如下:

def register_with_event_loop(self, connection, loop):
cycle = self.cycle
cycle.on_poll_init(loop.poller)
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable

def on_poll_start():
    cycle_poll_start()
    [add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
loop.call_repeatedly(10, cycle.maybe_restore_messages)
loop.call_repeatedly(
    health_check_interval,
    cycle.maybe_check_subclient_health
)

对于 thread-based Transport,

则采用了 celery.utils.timer2.Timer,timer2 自己继承了线程类,使用自己这个线程来做定时等待;
比如在 Mongodb transport 之中,就没有任何关于 event loop 的操作。
即,选用 timer 的哪种实现,看是否需要等待来决定,就是谁来完成 “等待” 这个动作。

翻了翻 Celery 2.4.7 的代码,发现在这个版本,确实只有 Thread-based timer,其代码涵盖了 目前的 timer 2 和 kombu.asynchronous.timer.Timer 大部分功能。应该是从 3.0.2 之后,把部分代码分离到了 kombu.asynchronous.timer.Timer ,实现了 Thread-less 和 Thread-based 两个不同的实现。

具体可以参见下面源码中的注释:

  • RabbitMQ/Redis: thread-less and lock-free rate-limit implementation.

    This means that rate limits pose minimal overhead when used with
    RabbitMQ/Redis or future transports using the event-loop,
    and that the rate-limit implementation is now thread-less and lock-free.

    The thread-based transports will still use the old implementation for
    now, but the plan is to use the timer also for other
    broker transports in Celery 3.1.
    0x03 Timer in Pool
    注意,上面的是 Timer Step,是一个启动的阶段,其目的是生成 Timer 组件 给 其他组件使用,并不是 Timer 功能类。

我们其次来看看 Timer 功能类 在 线程池 Pool 中的使用,就对应了前面 Blueprint step 之中的两种不同 cases。

分别也对应了两种应用场景(或者说是线程池实现):

gevent 和 eventlet 使用 kombu.asynchronous.timer.Timer。
BasePool(以及其他类型线程池)使用了 timer2.Timer。
初步来分析,gevent 和 eventlet 都是用协程来模拟线程,所以本身具有Event loop,因此使用 kombu.asynchronous.timer.Timer 也算顺理成章。

3.1 gevent 和 eventlet
对于 gevent,eventlet 这种情况,使用了 class Timer(_timer.Timer) 作为 Timer 功能类。

从代码中可以看到,class Timer 扩展了 kombu.asynchronous.timer.Timer。

from kombu.asynchronous import timer as _timer

class Timer(_timer.Timer):

def __init__(self, *args, **kwargs):
    from gevent import Greenlet, GreenletExit

    class _Greenlet(Greenlet):
        cancel = Greenlet.kill

    self._Greenlet = _Greenlet
    self._GreenletExit = GreenletExit
    super().__init__(*args, **kwargs)
    self._queue = set()

def _enter(self, eta, priority, entry, **kwargs):
    secs = max(eta - monotonic(), 0)
    g = self._Greenlet.spawn_later(secs, entry)
    self._queue.add(g)
    g.link(self._entry_exit)
    g.entry = entry
    g.eta = eta
    g.priority = priority
    g.canceled = False
    return g

def _entry_exit(self, g):
    try:
        g.kill()
    finally:
        self._queue.discard(g)

def clear(self):
    queue = self._queue
    while queue:
        try:
            queue.pop().kill()
        except KeyError:
            pass

@property
def queue(self):
    return self._queue

3.2 BasePool
而 BasePool 采用了 timer2 . Timer 作为 Timer 功能类。

from celery.utils import timer2

class BasePool:
“”“Task pool.”""

Timer = timer2.Timer

下面我们具体看看 Timer 功能类 如何实现。

0x04 kombu.Timer
4.1 异步
kombu.asynchronous.timer.Timer 实现了异步Timer。

由其注释可以,kombu.asynchronous.timer.Timer 在调用者每次得到下一次entry时,会给出tuple of (wait_seconds, entry),调用者应该进行等待相应时间。

即,kombu.Timer是调用者等待,普通timer是timer自己启动线程等待。

“”“Iterate over schedule.
This iterator yields a tuple of (wait_seconds, entry),
where if entry is :const:None the caller should wait
for wait_seconds until it polls the schedule again.
“””
定义如下:

class Timer:
“”“Async timer implementation.”""

Entry = Entry

on_error = None

def __init__(self, max_interval=None, on_error=None, **kwargs):
    self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
    self.on_error = on_error or self.on_error
    self._queue = []

4.2 调用
4.2.1 添加 timer function
用户通过 call_repeatedly 来添加 timer function。

def call_repeatedly(self, secs, fun, args=(), kwargs=None, priority=0):
kwargs = {} if not kwargs else kwargs
tref = self.Entry(fun, args, kwargs)

@wraps(fun)
def _reschedules(*args, **kwargs):
    last, now = tref._last_run, monotonic()
    lsince = (now - tref._last_run) if last else secs
    try:
        if lsince and lsince >= secs:
            tref._last_run = now
            return fun(*args, **kwargs) # 调用用户方法
    finally:
        if not tref.canceled:
            last = tref._last_run
            next = secs - (now - last) if last else secs
            self.enter_after(next, tref, priority)

tref.fun = _reschedules
tref._last_run = None
return self.enter_after(secs, tref, priority)

4.2.2 调用
Timer通过apply_entry进行调用。

def apply_entry(self, entry):
try:
entry()
except Exception as exc:
if not self.handle_error(exc):
logger.error(‘Error in timer: %r’, exc, exc_info=True)
在获取下一次entry时,会返回等待时间。

def iter(self, min=min, nowfun=monotonic,
pop=heapq.heappop, push=heapq.heappush):
“”"Iterate over schedule.

This iterator yields a tuple of ``(wait_seconds, entry)``,
where if entry is :const:`None` the caller should wait
for ``wait_seconds`` until it polls the schedule again.
"""
max_interval = self.max_interval
queue = self._queue

while 1:
    if queue:
        eventA = queue[0]
        now, eta = nowfun(), eventA[0]

        if now < eta:
            yield min(eta - now, max_interval), None
        else:
            eventB = pop(queue)

            if eventB is eventA:
                entry = eventA[2]
                if not entry.canceled:
                    yield None, entry
                continue
            else:
                push(queue, eventB)
    else:
        yield None, None

4.3 实验
我们做实验看看 timer 功能类 的 使用。

4.3.1 示例代码
下面代码来自https://github.com/liuliqiang/blog_codes/tree/master/python/celery/kombu,特此感谢。

def main(arguments):
hub = Hub()
exchange = Exchange(‘asynt’)
queue = Queue(‘asynt’, exchange, ‘asynt’)

def send_message(conn):
    producer = Producer(conn)
    producer.publish('hello world', exchange=exchange, routing_key='asynt')
    print('message sent')

def on_message(message):
    print('received: {0!r}'.format(message.body))
    message.ack()
    # hub.stop()  # <-- exit after one message

conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)

def p_message():
    print('redis://localhost:6379')

with Consumer(conn, [queue], on_message=on_message):
    send_message(conn)
    hub.timer.call_repeatedly(
        3, p_message
    )
    hub.run_forever()

if name == ‘main’:
sys.exit(main(sys.argv[1:]))
这里,Hub 就是 timer 的客户。

得到Stack如下,可以看到 hub 使用 timer 做了消息循环,于是我们需要看看 hub:

p_message
_reschedules, timer.py:127
call, timer.py:65
fire_timers, hub.py:142
create_loop, hub.py:300
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:46
, testUb.py:50
启动时候的逻辑如下,hub 通过 hub.timer.call_repeatedly 设置了需要调用的用户函数 fun,在 Timer 内部,fun 被包装设置为 _reschedules。

Hub
 +
 |                                         +----------------------------------+
 |                                         |  kombu.asynchronous.timer.Timer  |
 |                                         |                                  |
 |                call_repeatedly(fun)     |                                  |
 |                                         |                                  |
 +---------------------------------------------->  _reschedules [@wraps(fun)] |
 |                                         |                                  |
 |                                         |                                  |
 |                                         |                                  |
 |                                         +----------------------------------+
 |
 |
 v

4.3.2 Hub 的使用
以下代码是Hub类,在这里,Hub 就是 timer 的用户。

可以看到,hub 建立了message_loop。在 loop 中,hub 会:

使用 fire_timers 进行 timer 处理,会设置下一次 timer。
得到 poll_timeout 后,会进行处理或者 sleep。
下面是简化版代码。

def create_loop():

while 1:

    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1

    if readers or writers:

        events = poll(poll_timeout)

        for fd, event in events or ():

            if event & READ:
                try:
                    cb, cbargs = readers[fd]
                try:
                    cb(*cbargs)
                except Empty:
                    pass

    else:
        # no sockets yet, startup is probably not done.
        sleep(min(poll_timeout, 0.1))
    yield

我们再看看 fire_timers,这就是调用用户方法。

def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
propagate=()):
timer = self.timer
delay = None

if timer and timer._queue:
    for i in range(max_timers):
        delay, entry = next(self.scheduler)
        if entry is None:
            break

        entry()# 调用用户方法
        
return min(delay or min_delay, max_delay)

使用Entry调用用户方法

class Entry:
“”“Schedule Entry.”""

def __call__(self):
    return self.fun(*self.args, **self.kwargs)# 调用用户方法

具体逻辑如下:

±-------------------------+
| |
| Hub |
| + |
| | | ±---------------------------------+
| | | | kombu.asynchronous.timer.Timer |
| | | | |
| | | call_repeatedly(fun) | |
| | | | |
| ±---------------------------------------> _reschedules [@wraps(fun)] |
| | | | |
| | | | |
| | | | |
| | | ±---------------------------------+
| create_loop |
| + | ^
| | | |
| | | |
| v | |
| | |
| ±–> message_loop | |
| | + | |
| | | | |
| | v | iter(self.timer) |
| | fire_timers ±-------------------------------------+
| | + |
| | | |
| | v |
| | poll |
| | + |
| | | |
| | v |
| | sleep |
| | + |
| | | |
| ±----------+ |
±-------------------------+
0x05 timer2
在celery/utils/timer2.py中定义了Timer类实例,可以看出其继承了threading.Thread,但是居然也用kombu.asynchronous.timer。

在源码注释中有:This is only used for transports not supporting AsyncIO。

其实,就是 timer2 自己做了一个线程来做定时sleep等待,然后调用用户方法而已。

from kombu.asynchronous.timer import Entry
from kombu.asynchronous.timer import Timer as Schedule
from kombu.asynchronous.timer import logger, to_timestamp

class Timer(threading.Thread): # 扩展了 线程
“”"Timer thread.

Note:
    This is only used for transports not supporting AsyncIO.
"""

Entry = Entry
Schedule = Schedule

running = False
on_tick = None

_timer_count = count(1)

在run方法中,会定期sleep。

def run(self):
try:
self.running = True
self.scheduler = iter(self.schedule)

    while not self._is_shutdown.isSet():
        delay = self._next_entry()
        if delay:
            if self.on_tick:
                self.on_tick(delay)
            if sleep is None:  # pragma: no cover
                break
            sleep(delay)
    try:
        self._is_stopped.set()
    except TypeError:  # pragma: no cover
        # we lost the race at interpreter shutdown,
        # so gc collected built-in modules.
        pass
except Exception as exc:
    sys.stderr.flush()
    os._exit(1)

在_next_entry方法中,调用用户方法,这是通过kombu.asynchronous.timer完成的。

def _next_entry(self):
with self.not_empty:
delay, entry = next(self.scheduler)
if entry is None:
if delay is None:
self.not_empty.wait(1.0)
return delay
return self.schedule.apply_entry(entry)
next = next = _next_entry # for 2to3
0x06 Heart
Timer 类主要是做一些定时调度方面的工作。

Heart 组件 就是使用 Timer组件 进行定期调度,发送心跳 Event,告诉其他 Worker 这个 Worker 还活着。

同时,当本worker 启动,停止时候,也发送 worker-online,worker-offline 这两种消息。

6.1 Heart in Bootstep
位置在:celery/worker/consumer/heart.py。

其作用就是启动 heart 功能类。

class Heart(bootsteps.StartStopStep):
“”"Bootstep sending event heartbeats.

This service sends a ``worker-heartbeat`` message every n seconds.

Note:
    Not to be confused with AMQP protocol level heartbeats.
"""

requires = (Events,)

def __init__(self, c,
             without_heartbeat=False, heartbeat_interval=None, **kwargs):
    self.enabled = not without_heartbeat
    self.heartbeat_interval = heartbeat_interval
    c.heart = None
    super().__init__(c, **kwargs)

def start(self, c):
    c.heart = heartbeat.Heart(
        c.timer, c.event_dispatcher, self.heartbeat_interval,
    )
    c.heart.start()

def stop(self, c):
    c.heart = c.heart and c.heart.stop()
shutdown = stop

6.2 Heart in Consumer
位置在:celery/worker/heartbeat.py。可以看到就是从启动之后,使用 call_repeatedly 定期发送心跳。

class Heart:
“”"Timer sending heartbeats at regular intervals.

Arguments:
    timer (kombu.asynchronous.timer.Timer): Timer to use.
    eventer (celery.events.EventDispatcher): Event dispatcher
        to use.
    interval (float): Time in seconds between sending
        heartbeats.  Default is 2 seconds.
"""

def __init__(self, timer, eventer, interval=None):
    self.timer = timer
    self.eventer = eventer

def _send(self, event, retry=True):
    return self.eventer.send(event, freq=self.interval, ...)

def start(self):
    if self.eventer.enabled:
        self.tref = self.timer.call_repeatedly(
            self.interval, self._send, ('worker-heartbeat',),
        )

此时变量为:

self = {Heart} <celery.worker.heartbeat.Heart object at 0x000001D377636408>
eventer = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x000001D37765B308>
interval = {float} 2.0
timer = {Timer: 0} <Timer(Timer-1, stopped daemon)>
tref = {NoneType} None
_send_sent_signal = {NoneType} None
6.3 worker-online
当启动时候,发送 worker-online 消息。

def start(self):
    if self.eventer.enabled:
        self._send('worker-online')
        self.tref = self.timer.call_repeatedly(
            self.interval, self._send, ('worker-heartbeat',),
        )

6.4 worker-offline
当停止时候,发送 worker-offline 消息。

def stop(self):
    if self.tref is not None:
        self.timer.cancel(self.tref)
        self.tref = None
    if self.eventer.enabled:
        self._send('worker-offline', retry=False)

6.5 发送心跳
Heart组件会调用 eventer 来群发心跳:

eventer 是 celery.events.dispatcher.EventDispatcher;
心跳是 ‘worker-heartbeat’ 这个 Event;
所以我们下文就要分析 celery.events.dispatcher.EventDispatcher。

def _send(self, event, retry=True):
    if self._send_sent_signal is not None:
        self._send_sent_signal(sender=self)
    return self.eventer.send(event, freq=self.interval,
                             active=len(active_requests),
                             processed=all_total_count[0],
                             loadavg=load_average(),
                             retry=retry,
                             **SOFTWARE_INFO)

以上是关于[源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat的主要内容,如果未能解决你的问题,请参考以下文章

[源码分析] 分布式任务队列 Celery 之 发送Task & AMQP

[源码解析] 并行分布式框架 Celery 之 Lamport 逻辑时钟 & Mingle

Python 并行分布式框架 Celery

Python 神器 Celery 源码解析:不同启动模式的分析

python之celery队列模块

Celery 分布式任务队列快速入门