Django、Celery、Redis、RabbitMQ:Fanout-On-Writes 的链式任务

Posted

技术标签:

【中文标题】Django、Celery、Redis、RabbitMQ:Fanout-On-Writes 的链式任务【英文标题】:Django, Celery, Redis, RabbitMQ: Chained Tasks for Fanout-On-Writes 【发布时间】:2014-01-27 05:29:57 【问题描述】:

我一直在观看 Rick Branson 的 PyCon 视频:Messaging at Scale at Instagram。您可能想观看视频以回答这个问题。 Rick Branson 使用 Celery、Redis 和 RabbitMQ。为了让您跟上进度,每个用户都有一个用于其 homefeed 的 redis 列表。每个列表都包含他们关注的人发布的照片​​的媒体 ID。

例如,贾斯汀比伯拥有 150 万粉丝。当他发布照片时,需要将该照片的 ID 插入到他的每个关注者的每个单独的 redis 列表中。这称为 Fanout-On-Write 方法。然而,这种方法存在一些可靠性问题。它可以工作,但对于像 Justin Bieber 或 Lady Gaga 这样拥有数百万追随者的人来说,在网络请求中执行此操作(您有 0-500 毫秒来完成请求)可能会出现问题。届时,请求将超时。

因此 Rick Branson 决定使用 Celery,这是一个基于分布式消息传递的异步任务队列/作业队列。任何繁重的工作,例如将媒体 ID 插入关注者列表,都可以在 Web 请求之外异步完成。请求将完成,celery 将继续将 ID 插入所有列表。

这种方法可以创造奇迹。但同样,您不想将 Justin 的所有追随者都分批交付给 Celery,因为这会占用 celery 工人。为什么不让多个工人同时工作,这样它就可以更快地完成呢?卓见!你想把这个块分成更小的块,并让不同的工人在每批上工作。 Rick Branson 处理了一批 10,000 名追随者,他使用称为光标的东西不断为贾斯汀·比伯的所有追随者插入媒体 ID,直到完成。在视频中,他在 3:56 中谈到了这一点

我想知道是否有人可以对此进行更多解释并举例说明如何做到这一点。我目前正在尝试尝试相同的设置。我使用 Andy McCurdy 的 redis-py python 客户端库与我的 redis 服务器进行通信。对于我服务上的每个用户,我都会创建一个 redis 关注者列表。

因此,ID 为 343 的用户将在以下键处拥有一个列表:

followers:343

我还为每个用户创建了一个 homefeed 列表。每个用户都有自己的列表。 因此,ID 为 1990 的用户将在以下键处拥有一个列表:

homefeed:1990

在“followers:343”redis 列表中,包含了所有关注用户 343 的人的 ID。用户 343 有 20,007 个关注者。下面,我检索列表中从索引 0 开始一直到结尾 -1 的所有 ID,只是为了向您展示它的样子。

>>> r_server.lrange("followers:343", 0, -1)
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs.

您看到的是关注用户 343 的所有用户 ID 的列表。

这是我的 proj/mydjangoapp/tasks.py,其中包含我的 insert_into_homefeed 函数:

from __future__ import absolute_import
from celery import shared_task
import redis
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX')

@shared_task
def insert_into_homefeed(photo_id, user_id):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1)

    # Now for each follower_id in followers_list, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.

    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
    return "Fan Out Completed for %s" % (user_id)

在此任务中,当从 Django 视图调用时,它将获取关注用户 343 的人的所有 ID,然后将照片 ID 插入到他们的所有 homefeed 列表中。

这是我在 proj/mydjangoapp/views.py 中的上传视图。我基本上调用了 celery 的 delay 方法并传递了必要的变量,以便请求快速结束:

# Import the Celery Task Here
from mydjangoapp.tasks import insert_into_homefeed


@csrf_exempt
def Upload(request):
    if request.method == 'POST':
        data  = json.loads(request.body)
        newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url'])
        newPhoto_ID = newPhoto.pk
        insert_into_homefeed.delay(newPhoto_ID, data['user_id'])
        return HttpResponse("Request Completed")

我怎样才能做到这一点,它会被 10,000 分批?

【问题讨论】:

【参考方案1】:

视频中描述的方法是任务“链接”。

要让您的任务方法以链的形式启动并运行,您需要添加一个额外的参数来表示关注者列表中的索引。该任务不是处理完整的关注者列表,而是仅处理固定的批量大小,从它被传递的索引参数开始。完成后,任务应该创建一个新任务并传递新索引。

INSERT_INTO_HOMEFEED_BATCH = 10000

@shared_task
def insert_into_homefeed(photo_id, user_id, index=0):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index

    followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit)

    if not followers_list_batch:
        return # zero followers or no more batches

    # Now for each follower_id in followers_list_batch, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.
    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)

    insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)

这很好用,因为 Redis lists are ordered 和 lrange 命令 doesn't return an error on out-of-range inputs。

【讨论】:

感谢您的快速回复! :) 不错的方法!但这不是无限循环吗?即使在我遍历整个列表之后,该任务不会一遍又一遍地被调用吗? 啊!我刚刚看到了 if Followers_list_batch: 你明白了。这可能是一个很好的迹象,我应该使用明确的 return 语句。

以上是关于Django、Celery、Redis、RabbitMQ:Fanout-On-Writes 的链式任务的主要内容,如果未能解决你的问题,请参考以下文章

django redis celery 和 celery beats 的正确设置

Django+Celery+Redis 使用

Django使用Celery加redis执行异步任务

django+celery+redis环境配置

celery+django+redis使用介绍

在使用 django_celery_beat 设置的 Django 视图中使用 Celery 定期任务输出,并使用 Redis 设置缓存