推荐算法协同过滤算法代码(pyspark | ALS)
Posted MachineCYL
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了推荐算法协同过滤算法代码(pyspark | ALS)相关的知识,希望对你有一定的参考价值。
【推荐算法】协同过滤算法介绍_MachineCYL的博客-CSDN博客
上文介绍了协同过滤算法的原理,接下来我介绍一下协同过滤算法的代码实现。
下面我就开始介绍用pyspark中的ALS(交替最小二乘矩阵分解)来实现协同过滤代码。
一、ALS的简单介绍
ALS算法是2008年以来,用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中,使用起来比较方便。从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User(用户)和Item(商品)两个方面。
用户和商品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。
假设我们有一批用户数据,其中包含m个User和n个Item,则我们定义Rating矩阵 Rm×n。在实际使用中,由于n和m的数量都十分巨大,因此R矩阵的规模很容易就会突破1亿项。这时候,传统的矩阵分解方法对于这么大的数据量已经是很难处理了。另一方面,一个用户也不可能给所有商品评分,因此,R矩阵注定是个稀疏矩阵。而使用ALS可以达到数据降维的目的,大大减少计算量。
二、前期准备
- 使用pyspark前要先安装spark的环境。我的spark版本是2.4.3,pyspark版本也是2.4.3。
- 如果需要安装spark环境,可以参考:
- pyspark安装指令如下(加清华源,下载快多了):
pip install pyspark==2.4.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
-
数据格式(如果需要获取数据,可以见下方链接):
模型训练用到的数据格式如下:主要用到userId、movieId和rating字段。
三、详细代码
定义CollaborativeFiltering类,主要封装模型训练、模型保存、用户推荐、商品推荐等代码。
class CollaborativeFiltering(object):
def __init__(self, spark_session):
self.spark_session = spark_session
self.model = None
def train(self, train_set, user_col, item_col, rating_col, epoch=10):
"""
Build the recommendation model using ALS on the training data
Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
"""
als = ALS(regParam=0.01, maxIter=epoch, userCol=user_col, itemCol=item_col, ratingCol=rating_col,
coldStartStrategy='drop')
self.model = als.fit(train_set)
def eval(self, test_set, label_col='ratingFloat', metric='rmse'):
""" Evaluate the model on the test data """
predictions = self.model.transform(test_set)
# self.model.itemFactors.show(10, truncate=False)
# self.model.userFactors.show(10, truncate=False)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=label_col, metricName=metric)
loss = evaluator.evaluate(predictions)
return loss
def save(self, model_dir="./model_param"):
self.model.write().overwrite().save(model_dir)
def load(self, model_dir="./model_param"):
self.model = ALSModel.load(model_dir)
def recommend_for_all_users(self, num_items=10):
user_recs = self.model.recommendForAllUsers(numItems=num_items)
return user_recs
def recommend_for_all_items(self, num_users=10):
item_recs = self.model.recommendForAllItems(numUsers=num_users)
return item_recs
def recommend_for_user_subset(self, dataset, num_items=10):
user_recs = self.model.recommendForUserSubset(dataset=dataset, numItems=num_items)
return user_recs
def recommend_for_item_subset(self, dataset, num_users=10):
item_recs = self.model.recommendForItemSubset(dataset=dataset, numUsers=num_users)
return item_recs
新建代码文件train_cf.py,加入示例代码如下:
def parse_argvs():
parser = argparse.ArgumentParser(description='[collaborativeFiltering]')
parser.add_argument("--data_path", type=str, default='./data/ratings.csv')
parser.add_argument("--model_path", type=str, default='./model_param')
parser.add_argument("--epoch", type=int, default=10)
parser.add_argument("--train_flag", type=bool, default=False)
args = parser.parse_args()
print('[input params] '.format(args))
return parser, args
if __name__ == '__main__':
parser, args = parse_argvs()
data_path = args.data_path
model_path = args.model_path
train_flag = args.train_flag
epoch = args.epoch
conf = SparkConf().setAppName('collaborativeFiltering').setMaster("local[*]")
spark_session = SparkSession.builder.config(conf=conf).getOrCreate()
# read data
data_path = os.path.abspath(data_path)
data_path = "file://" + data_path
print("[spark] read file path: ".format(data_path))
ratingSamples = spark_session.read.format('csv').option('header', 'true').load(data_path) \\
.withColumn("userIdInt", F.col("userId").cast(IntegerType())) \\
.withColumn("movieIdInt", F.col("movieId").cast(IntegerType())) \\
.withColumn("ratingFloat", F.col("rating").cast(FloatType()))
training, test = ratingSamples.randomSplit((0.8, 0.2), seed=2022)
# collaborative filtering start
cf = CollaborativeFiltering(spark_session=spark_session)
if train_flag is True:
cf.train(train_set=training,
user_col='userIdInt',
item_col='movieIdInt',
rating_col='ratingFloat',
epoch=epoch)
cf.save(model_dir=model_path)
else:
cf.load(model_dir=model_path)
loss = cf.eval(test_set=test, label_col='ratingFloat', metric='rmse')
print("[Root-mean-square error] ".format(loss))
# Generate top 10 movie recommendations for each user
user_recs = cf.recommend_for_all_users(num_items=10)
user_recs.show(10, False)
# Generate top 10 user recommendations for each movie
movie_recs = cf.recommend_for_all_items(num_users=10)
movie_recs.show(10, False)
# Generate top 10 movie recommendations for a specified set of users
user_data = ratingSamples.select("userIdInt").distinct().limit(10)
user_sub_recs = cf.recommend_for_user_subset(dataset=user_data, num_items=10)
user_sub_recs.show(10, False)
# Generate top 10 user recommendations for a specified set of movies
movie_data = ratingSamples.select("movieIdInt").distinct().limit(10)
movie_sub_recs = cf.recommend_for_item_subset(dataset=movie_data, num_users=10)
movie_sub_recs.show(10, False)
spark_session.stop()
运行指令示例:
# 进行ALS模型训练和预测
python train_cf.py --data_path "./data/ratings.csv" --train_flag True
# 不训练,直接进行ALS模型预测
python train_cf.py --data_path "./data/ratings.csv"
代码执行结果如下(部分):
需要获取训练数据和代码可以访问我的github,如果觉得有帮助,请star收藏,谢谢~
参考链接
以上是关于推荐算法协同过滤算法代码(pyspark | ALS)的主要内容,如果未能解决你的问题,请参考以下文章
Python+Django+Mysql志愿者活动推荐系统 基于用户项目内容的协同过滤推荐算法 SimpleWebActivityCFRSPython python实现协同过滤推荐算法实现源代码下载
Python+Django+Mysql个性化二手车推荐系统 汽车推荐系统 基于用户项目内容的协同过滤推荐算法 WebCarCFRSPython python实现协同过滤推荐算法实现源代码下载