如何用 Python 构建一个简单的分布式系统

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何用 Python 构建一个简单的分布式系统相关的知识,希望对你有一定的参考价值。

分布式爬虫概览
何谓分布式爬虫?
通俗的讲,分布式爬虫就是多台机器多个
spider
对多个
url
的同时处理问题,分布式的方式可以极大提高程序的抓取效率。
构建分布式爬虫通畅需要考虑的问题
(1)如何能保证多台机器同时抓取同一个URL?
(2)如果某个节点挂掉,会不会影响其它节点,任务如何继续?
(3)既然是分布式,如何保证架构的可伸缩性和可扩展性?不同优先级的抓取任务如何进行资源分配和调度?
基于上述问题,我选择使用celery作为分布式任务调度工具,是分布式爬虫中任务和资源调度的核心模块。它会把所有任务都通过消息队列发送给各个分布式节点进行执行,所以可以很好的保证url不会被重复抓取;它在检测到worker挂掉的情况下,会尝试向其他的worker重新发送这个任务信息,这样第二个问题也可以得到解决;celery自带任务路由,我们可以根据实际情况在不同的节点上运行不同的抓取任务(在实战篇我会讲到)。本文主要就是带大家了解一下celery的方方面面(有celery相关经验的同学和大牛可以直接跳过了)
Celery知识储备
celery基础讲解
按celery官网的介绍来说
Celery
是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
下面几个关于celery的核心知识点
broker:翻译过来叫做中间人。它是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,执行相应程序。这其实就是消费者和生产者之间的桥梁。
backend:
通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。
worker:
Celery类的实例,作用就是执行各种任务。注意在celery3.1.25后windows是不支持celery
worker的!
producer:
发送任务,将其传递给broker
beat:
celery实现的定时任务。可以将其理解为一个producer,因为它也是通过网络调用定时将任务发送给worker执行。注意在windows上celery是不支持定时任务的!
下面是关于celery的架构示意图,结合上面文字的话应该会更好理解
由于celery只是任务队列,而不是真正意义上的消息队列,它自身不具有存储数据的功能,所以broker和backend需要通过第三方工具来存储信息,celery官方推荐的是
RabbitMQ和Redis,另外mongodb等也可以作为broker或者backend,可能不会很稳定,我们这里选择Redis作为broker兼backend。
实际例子
先安装celery
pip
install
celery
我们以官网给出的例子来做说明,并对其进行扩展。首先在项目根目录下,这里我新建一个项目叫做celerystudy,然后切换到该项目目录下,新建文件tasks.py,然后在其中输入下面代码
这里我详细讲一下代码:我们先通过app=Celery()来实例化一个celery对象,在这个过程中,我们指定了它的broker,是redis的db
2,也指定了它的backend,是redis的db3,
broker和backend的连接形式大概是这样
redis://:password@hostname:port/db_number
然后定义了一个add函数,重点是@app.task,它的作用在我看来就是将add()
注册为一个类似服务的东西,本来只能通过本地调用的函数被它装饰后,就可以通过网络来调用。这个tasks.py中的app就是一个worker。它可以有很多任务,比如这里的任务函数add。我们再通过在命令行切换到项目根目录,执行
celery
-A
tasks
worker
-l
info
启动成功后就是下图所示的样子
这里我说一下各个参数的意思,-A指定的是app(即Celery实例)所在的文件模块,我们的app是放在tasks.py中,所以这里是
tasks;worker表示当前以worker的方式运行,难道还有别的方式?对的,比如运行定时任务就不用指定worker这个关键字;
-l
info表示该worker节点的日志等级是info,更多关于启动worker的参数(比如-c、-Q等常用的)请使用
celery
worker
--help
进行查看
将worker启动起来后,我们就可以通过网络来调用add函数了。我们在后面的分布式爬虫构建中也是采用这种方式分发和消费url的。在命令行先切换到项目根目录,然后打开python交互端
from
tasks
import
addrs
=
add.delay(2,
2)
这里的add.delay就是通过网络调用将任务发送给add所在的worker执行,这个时候我们可以在worker的界面看到接收的任务和计算的结果。
这里是异步调用,如果我们需要返回的结果,那么要等rs的ready状态true才行。这里add看不出效果,不过试想一下,如果我们是调用的比较占时间的io任务,那么异步任务就比较有价值了
上面讲的是从Python交互终端中调用add函数,如果我们要从另外一个py文件调用呢?除了通过import然后add.delay()这种方式,我们还可以通过send_task()这种方式,我们在项目根目录另外新建一个py文件叫做
excute_tasks.py,在其中写下如下的代码
from
tasks
import
addif
__name__
==
'__main__':
add.delay(5,
10)
这时候可以在celery的worker界面看到执行的结果
此外,我们还可以通过send_task()来调用,将excute_tasks.py改成这样
这种方式也是可以的。send_task()还可能接收到为注册(即通过@app.task装饰)的任务,这个时候worker会忽略这个消息
定时任务
上面部分讲了怎么启动worker和调用worker的相关函数,这里再讲一下celery的定时任务。
爬虫由于其特殊性,可能需要定时做增量抓取,也可能需要定时做模拟登陆,以防止cookie过期,而celery恰恰就实现了定时任务的功能。在上述基础上,我们将tasks.py文件改成如下内容
然后先通过ctrl+c停掉前一个worker,因为我们代码改了,需要重启worker才会生效。我们再次以celery
-A
tasks
worker
-l
info这个命令开启worker。
这个时候我们只是开启了worker,如果要让worker执行任务,那么还需要通过beat给它定时发送,我们再开一个命令行,切换到项目根目录,通过
这样就表示定时任务已经开始运行了。
眼尖的同学可能看到我这里celery的版本是3.1.25,这是因为celery支持的windows最高版本是3.1.25。由于我的分布式微博爬虫的worker也同时部署在了windows上,所以我选择了使用
3.1.25。如果全是linux系统,建议使用celery4。
此外,还有一点需要注意,在celery4后,定时任务(通过schedule调度的会这样,通过crontab调度的会马上执行)会在当前时间再过定时间隔执行第一次任务,比如我这里设置的是60秒的间隔,那么第一次执行add会在我们通过celery
beat
-A
tasks
-l
info启动定时任务后60秒才执行;celery3.1.25则会马上执行该任务
参考技术A ython使用multiprocessing实现一个最简单的分布式作业调度系统介绍Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把本回答被提问者采纳

如何用Python搭建一个简单的推荐系统?

推荐系统的相关知识我们已在前文中提到,在这篇文章中,我们会介绍如何用Python来搭建一个简单的推荐系统。

本文使用的数据集是MovieLens数据集,该数据集由明尼苏达大学的Grouplens研究小组整理。它包含1,10和2亿个评级。 Movielens还有一个网站,我们可以注册,撰写评论并获得电影推荐。接下来我们就开始实战演练。

在这篇文章中,我们会使用Movielens构建一个基于item的简易的推荐系统。在开始前,第一件事就是导入pandas和numPy。

import pandas as pd import numpy as np import warnings warnings.filterwarnings(‘ignore‘)

接下来,我们使用pandas read_csv()加载数据集。数据集由制表符分隔,所以我们将\ t传递给sep参数。然后,使用names参数传入列名。

df = pd.read_csv(‘u.data‘, sep=‘\t‘, names=[‘user_id‘,‘item_id‘,‘rating‘,‘titmestamp‘])

接下来查看表头,检查一下正在处理的数据。

df.head()

如果我们能够看到电影的标题而不仅仅是ID,那再好不过了。之后加载电影标题并把它与此数据集合并。

movie_titles = pd.read_csv(‘Movie_Titles‘) movie_titles.head()

由于item_id列相同,我们可以在此列上合并这些数据集。

df = pd.merge(df, movie_titles, on=‘item_id‘) df.head()

数据集中的每一列分部代表:

?user_id - 评级电影的用户的ID。

?item_id- 电影的ID。

?rating - 用户为电影提供的评级,介于1和5之间。

?timestamp - 电影评级的时间。

?title - 电影标题。

使用describe或info命令,就可以获得数据集的简要描述。如果想要真正了解正在使用的数据集的话,这一点非常重要。

df.describe()

可以看出,数据集共有100003条记录,电影的平均评分介于3.52-5之间。

现在我们再创建一个dataframe,其中包含每部电影的平均评分和评分数量。之后,这些评分将用来计算电影之间的相关性。相关性是一种统计指标,表示两个或多个变量一起波动的程度。相关系数越高,电影越为相似。

以下例子将使用Pearson相关系数 (Pearson correlation coefficient),该数字介于-1和1之间,1表示正线性相关,-1表示负相关, 0表示没有线性相关。也就是说,具有零相关性的电影完全不相似。

我们会使用pandas groupby 功能来创建dataframe。按照标题对数据集进行分组,并计算其平均值获得每部电影的平均评分。

ratings = pd.DataFrame(df.groupby(‘title‘)[‘rating‘].mean()) ratings.head()

接下来我们创建number_of_ratings列,这样就能看到每部电影的评分数量。完成这步操作后,就可以看到电影的平均评分与电影获得的评分数量之间的关系。五星级电影很有可能只被一个人评价,而这种五星电影在统计上是不正确的。

因此,在构建推荐系统时,我们需要设置阈值。我们可以使用pandas groupby功能来创建新列,然后按标题栏分组,使用计数函数计算每部电影的评分。之后,便可以使用head()函数查看新的dataframe。

rating [‘number_of_ratings‘] = df.groupby(‘title‘)[‘rating‘].count() ratings.head()

接下来我们使用pandas绘制功能来绘制直方图,显示评级的分布:

import matplotlib.pyplot as plt %matplotlib inline ratings[‘rating‘].hist(bins=50)

可以看到,大多数电影的评分都在2.5-4之间。通过类似的方法还可以将number_of_ratings列可视化。

ratings[‘number_of_ratings‘].hist(bins=60)

从上面的直方图中可以清楚地看出,多数电影的评分都很低,评分最高的电影是一些非常有名的电影。

现在让我们再来看一下电影评级与评分数量之间的关系。我们可以使用seaborn绘制散点图,然后使用jointplot()函数执行此操作。

import seaborn as sns sns.jointplot(x=‘rating‘, y=‘number_of_ratings‘, data=ratings)

从图中我们可以看出,电影平均评分与评分数量之间呈正相关关系,电影获得的评分数量越多,其平均评分越高。

创建基于item的简易推荐系统

接下来我们会快速创建一个基于item的简单的推荐系统。

首先,我们需要将数据集转换为矩阵,电影标题为列,user_id为索引,评级为值。完成这一步,我们将得到一个dataframe,其中列是电影标题,行是用户ID。每列代表所有用户对电影的所有评级。评级为NAN表示用户未对这部电影评分。

我们可以用该矩阵来计算单个电影的评级与矩阵中其余电影的相关性,该矩阵可以通过pandas pivot_table实现。

movie_matrix = df.pivot_table(index =‘user_id‘,columns =‘title‘,values =‘rating‘) movie_matrix.head()

接下来让我们找到评分数量最多的电影,并选择其中的两部电影。然后使用pandas sort_values并将升序设置为false,以便显示评分最多的电影。然后使用head()函数来查看评分数目最多的前十部电影。

ratings.sort_values(‘number_of_ratings‘, ascending=False).head(10)

假设一个用户曾看过Air Force One(1997)和Contact(1997),我们想根据这两条观看记录向该用户推荐其他类似的电影,那么这一点可以通过计算这两部电影的评级与数据集中其他电影的评级之间的相关性来实现。第一步是创建一个dataframe,其中包含来自movie_matrix的这些电影的评级。

AFO_user_rating = movie_matrix[‘Air Force One (1997)‘] contact_user_rating = movie_matrix[‘Contact (1997)‘]

Dataframe可以显示user_id和这两部电影的评分。

AFO_user_rating.head() contact_user_rating.head()

使用pandas corwith功能计算两个dataframe之间的相关性。有了这一步,就能够获得每部电影的评级与Air Force One电影的评级之间的相关性。

similar_to_air_force_one = movie_matrix.corrwith(AFO_user_rating)

可以看到,Air Force One电影和Till There Was You(1997)之间的相关性是0.867。这表明这两部电影之间有很强的相似性。

similar_to_air_force_one.head()

还可以计算Contact(1997)的评级与其他电影评级之间的相关性,步骤同上:

similar_to_contact = movie_matrix.corrwith(contact_user_rating)

可以从中发现,Contact(1997)和Till There Was You(1997)之间存在非常强的相关性(0.904)。

similar_to_contact.head()

前边已经提到,并非所有用户都对所有电影进行了评分,因此,该矩阵中有很多缺失值。为了让结果看起来更有吸引力,删除这些空值并将相关结果转换为dataframe。

corr_contact = pd.DataFrame(similar_to_contact, columns=[‘Correlation‘]) corr_contact.dropna(inplace=True) corr_contact.head()corr_AFO = pd.DataFrame(similar_to_air_force_one, columns=[‘correlation‘]) corr_AFO.dropna(inplace=True) corr_AFO.head()

上面这两个dataframe分别展示了与Contact(1997)和Air Force One(1997)电影最相似的电影。然而,问题出现了,有些电影的实际质量非常低,但可能因为一两位用户给他们5星评级而被推荐。

这个问题可以通过设置评级数量的阈值来解决。从早期的直方图中看到,评级数量从100开始急剧下降。因此可以将此设置为阈值,但是也可以考虑其他合适的值。为此,我们需要将两个dataframe与rating datframe中的number_of_ratings列一起加入。

corr_AFO = corr_AFO.join(ratings[‘number_of_ratings‘]) corr_contact = corr_contact.join(ratings[‘number_of_ratings‘])corr_AFO.head()corr_contact.head()

现在,我们就能得到与Air Force One(1997)最相似的电影,并把这些电影限制在至少有100条评论的电影中,然后可以按相关列对它们进行排序并查看前10个。

corr_AFO [corr_AFO [‘number_of_ratings‘]> 100] .sort_values(by =‘correlation‘,ascending = False).head(10)

我们注意到Air Force One(1997)与自身相关性最高,这并不奇怪。下一部与Air Force One(1997)最相似的电影是Hunt for Red October,相关系数为0.554。

显然,通过更改评论数量的阈值,我们可以按之前的方式得到不同的结果。限制评级数量可以让我们获得更好的结果。

现在重复上边的步骤,可以看到与Contact(1997)电影最相关的电影:

corr_contact [corr_contact [‘number_of_ratings‘]> 100] .sort_values(by =‘Correlation‘,ascending = False).head(10)

Contact(1997)最相似的电影是Philadelphia(1993),相关系数为0.446,有137个评级。所以,如果有人喜欢Contact(1997),我们可以向他们推荐上述电影。

以上是构建推荐系统的一种非常简单的方法,但并不符合行业标准。后续的话我们可以通过构建基于存储器的协同过滤系统来改进该系统。在这种情况下,将数据划分为训练集和测试集,使用诸如余弦相似性来计算电影之间的相似性;或者构建基于模型的协作过滤系统,然后使用Root Mean Squared Error(RMSE)等技术评估模型。

Github: mwitiderrick/simple-recommender-

文章来源:How to build a Simple Recommender System in Python

(以上内容由第四范式先荐整理发布)

以上是关于如何用 Python 构建一个简单的分布式系统的主要内容,如果未能解决你的问题,请参考以下文章

如何用python写爬虫 知乎

1.凤凰架构:构建可靠的大型分布式系统 --- 服务架构演进史

1.凤凰架构:构建可靠的大型分布式系统 --- 服务架构演进史

如何用Python搭建一个简单的推荐系统?

如何用消息系统避免分布式事务?

一文详解如何用MySQL/Redis/ZooKeeper实现分布式锁