PySpark - ALS 输出中的 RDD 到 DataFrame
Posted
技术标签:
【中文标题】PySpark - ALS 输出中的 RDD 到 DataFrame【英文标题】:PySpark - RDD to DataFrame in ALS output 【发布时间】:2016-03-28 17:37:31 【问题描述】:我正在使用 Spark 的推荐系统。
训练一个模型后,我做了下面的代码来获得推荐 model.recommendProductsForUsers(2)
[(10000, (Rating(user=10000, product=14780773, rating=7.35695469892999e-05),
Rating(user=10000, product=17229476, rating=5.648606256948921e-05))),
(0, (Rating(user=0, product=16750010, rating=0.04405213492474741),
Rating(user=0, product=17416511, rating=0.019491942665715176))),
(20000, (Rating(user=20000, product=17433348, rating=0.017938298063142653),
Rating(user=20000, product=17333969, rating=0.01505112418739887)))]
在这种情况下,Rec
是 RDD
,见下文。
>>> type(Rec)
<class 'pyspark.rdd.RDD'>
我怎样才能把这些信息放在像这样的数据框中
User | Product | Rating
1000 | 14780773 | 7.3e-05
1000 | 17229675 | 5.6e-05
(...) (...) (...)
2000 | 17333969 | 0.015
感谢您的宝贵时间
【问题讨论】:
必要的函数是covered in the PySpark docs。查找createDataFrame
。
【参考方案1】:
为了验证,我使用以下 pyspark 代码重现了您的 RDD
:
from pyspark.mllib.recommendation import Rating
Rec = sc.parallelize([(10000, (Rating(user=10000, product=14780773, rating=7.35695469892999e-05),
Rating(user=10000, product=17229476, rating=5.648606256948921e-05))),
(0, (Rating(user=0, product=16750010, rating=0.04405213492474741),
Rating(user=0, product=17416511, rating=0.019491942665715176))),
(20000, (Rating(user=20000, product=17433348, rating=0.017938298063142653),
Rating(user=20000, product=17333969, rating=0.01505112418739887)))])
这个 RDD 由键值对组成,每个值都包含一个带有 Rating 元组的记录。您需要映射 RDD 以仅保留记录,然后将结果分解为每个推荐具有单独的元组。 flatMap(f)
函数将这两个步骤压缩成这样:
flatRec = Rec.flatMap(lambda p: p[1])
这会产生如下形式的 RDD:
[Rating(user=10000, product=14780773, rating=7.35695469892999e-05),
Rating(user=10000, product=17229476, rating=5.648606256948921e-05),
Rating(user=0, product=16750010, rating=0.04405213492474741),
Rating(user=0, product=17416511, rating=0.019491942665715176),
Rating(user=20000, product=17433348, rating=0.017938298063142653),
Rating(user=20000, product=17333969, rating=0.01505112418739887)]
现在只需使用createDataFrame
函数将其转换为DataFrame。每个 Rating tuple 都会被转换成一个 DataFrame Row,并且由于这些项目被标记,你不需要指定一个 schema。
recDF = sqlContext.createDataFrame(flatRec).show()
这将输出以下内容:
+-----+--------+--------------------+
| user| product| rating|
+-----+--------+--------------------+
|10000|14780773| 7.35695469892999E-5|
|10000|17229476|5.648606256948921E-5|
| 0|16750010| 0.04405213492474741|
| 0|17416511|0.019491942665715176|
|20000|17433348|0.017938298063142653|
|20000|17333969| 0.01505112418739887|
+-----+--------+--------------------+
【讨论】:
以上是关于PySpark - ALS 输出中的 RDD 到 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
pyspark中的RDD到DataFrame(来自rdd的第一个元素的列)