当多个工人完成一个未知大小的工作时如何得到通知?

Posted

技术标签:

【中文标题】当多个工人完成一个未知大小的工作时如何得到通知?【英文标题】:How to get notified when multiple workers complete a job with an unknown size? 【发布时间】:2014-11-15 11:56:30 【问题描述】:

我正在尝试使用django-rq 在 Python 中创建一个网站爬虫。到目前为止,我的工人看起来像这样:

    从队列中获取下一页。 在数据库中创建页面记录。设置status=1。 下载页面内容和进程。可能需要一分钟左右。 对于页面中的每个链接
      检查链接是否已在数据库中注册。 如果没有,创建一个新的页面记录。设置status=0 并将链接添加到队列中。
    for循环结束后,检查status=0的页数是否为0,如果是,则工作完成。

status=1 表示页面已处理。 status=0 表示页面尚未处理。

现在,这个算法在单个工人身上运行得很好。但是,当有更多工人时不会这样做,因为有时会提前触发工作程序的结束。

实现这个worker的正确方法是什么?

【问题讨论】:

【参考方案1】:

1。调整您当前的系统

如果您推迟将页面的状态设置为 1,直到 完成处理,那么您的工作人员不应提前宣布“工作完成”。 您的第 2 步仅适用于您开始抓取的第一个页面。

所以你的系统会是这样的:

start job:
1. Create a page record in the database. Set status=0. Add page to queue.

worker:
1. Get the next page from the queue.
2. Download the page contents and process. Might take up to a minute or so.
3. For each link in the page
    1. Check if the link is already registered in the database.
    2. If not, create a new page record. Set status=0 and add the link to the queue.
4. After the for loop ends, set status=1 for this page.
5. Check whether the count of pages with status=0 is 0. If yes, the job is done.

存在的问题是,如果在前一个网络爬取作业完成之前开始后续的网络爬取作业,您只会在最后一个作业结束时获得“作业完成”。 您也许可以在您的数据库页面记录中添加一个工作 ID,并将“工作完成”重新定义为 count(status=0 and job-id=x) = 0

2。利用 RQ 的作业类

来自RQ docs:

当作业入队时,queue.enqueue() 方法返回一个 Job 实例。 ...它有一个方便的结果访问器属性,当作业尚未完成时将返回 None ,或者在作业完成时返回非 None 值(当然,假设作业首先有返回值) .

您可以对两种不同类型的作业进行排队,一种是“获取网页”,另一种用于管理抓取过程。

管理作业将启动并跟踪所有“获取网页”作业。它会知道“工作完成”的时间,因为它的所有子工作都已完成。

不一定需要向数据库写入任何内容来管理抓取过程。

您需要运行 2 个以上的 worker,以便 crawlfetch 可以同时工作,可能在不同的队列中。

def something_web_facing():
    ...
    queue.enqueue(crawl, 'http://url.com/start_point.html')
    ...

def crawl(start_url):
    fetch_jobs = []
    seen_urls = set()

    seen_urls.add(start_url)
    fetch_jobs.append( queue.enqueue(fetch, start_url) )

    while len(fetch_jobs) > 0:

        # loop over a copy of fetch_jobs
        for job in list(fetch_jobs):

            # has this job completed yet?
            if job.result:

                # a fetch job returns a list of the next urls to crawl
                for url in job.result:

                    # fetch this url if we haven't seen it before
                    if url not in seen_urls:
                        seen_urls.add(url)
                        fetch_jobs.append( queue.enqueue(fetch, url) )

                fetch_jobs.remove(job)

        time.sleep(1)

    return "Job done!"

def fetch(url):
    """Get web page from url, return a list of links to follow next"""

    html_page = download_web_page(url)
    links_to_follow = find_links_to_follow(html_page)
    return links_to_follow

3。使用别人的网络爬虫代码

Scrapy

您可以排队使用 scrapy 的作业。 Run scrapy from a script

【讨论】:

【参考方案2】:

只是一个想法,但你不能有一个额外的表来保存工人的状态吗?

即,有 10 名工人和以下状态: 0-8 - “保持” 9 - “工作”

从这个状态开始,9 可能会将更多页面添加到 0-8 可以处理的队列中(并将它们在表中的状态更改为“工作”)。

另外需要注意的是,worker 确定其状态的顺序需要精确:

    从队列中获取下一页。 在数据库中创建页面记录。设置状态=1。 下载页面内容和进程。可能需要一分钟左右。 对于页面中的每个链接
      检查链接是否已在数据库中注册。 2.如果没有,创建一个新的页面记录。设置 status=0 并将链接添加到队列中。
    检查队列中是否有任何页面。 如果没有,进入等待状态。

如果工作人员处于等待状态,其操作顺序可能类似于:

    醒来 如果队列中有作业,则进入“工作”状态 如果不是,请检查是否所有工作人员都处于“暂停”状态。 如果是,退出。

任何处于“等待”状态的工人都应定期检查是否有任何处于“工作”状态的工人。如果是这样,请检查队列然后休眠。如果没有,结束。

另一种方法是让一个指挥和控制工作人员监控其他工作人员。如果他们都处于“暂停”状态,则触发他们都观看结束的标志。

【讨论】:

我认为这是要走的路。知道如何防止竞争条件吗?例如,如果两个处于保持状态的工作人员同时检查队列并且其中一个决定“结束”而另一个刚刚开始处理页面会发生什么?您是否认为,如果保证工作人员在将找到的每个页面添加到队列之前不会更改其状态,就不会发生这种竞争条件? 我已经编辑了我的答案以包括每个工人的行动顺序。希望这可以消除任何混淆。 让工作人员通过存储过程与数据库对话并锁定表以进行更新/写入过程 - 这是一种方式。其他方法是使您的代码交易基于。不知道如何在 python 中做到这一点。

以上是关于当多个工人完成一个未知大小的工作时如何得到通知?的主要内容,如果未能解决你的问题,请参考以下文章

当活动之间的转换完成时如何得到通知?

当 TextEditor 失去焦点时如何得到通知?

当 RxJava 发生不同事件时如何得到通知?

将相同的队列项分配给多个工作人员

JUC源码解析CountDownLatch

Odoo 中的“工人”是啥?