PySpark 连接两个 RDD 导致一个空 RDD

Posted

技术标签:

【中文标题】PySpark 连接两个 RDD 导致一个空 RDD【英文标题】:PySpark join two RDD results in an empty RDD 【发布时间】:2016-08-26 09:10:41 【问题描述】:

我是一名 Spark 新手,试图在我的数据集上编辑和应用此电影推荐教程 (https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html)。但它一直抛出此错误:

ValueError: Can not reduce() empty RDD

这是计算模型的均方根误差的函数:

def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))

    print predictions.count()
    print predictions.first()
    print "predictions above"

    print data.count()
    print data.first()
    print "validation data above"


    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
#LINE56
   .join(data.map(lambda line: line.split(‘,’) ).map(lambda x: ((x[0], x[1]), x[2]))) \
  .values()    

   print predictionsAndRatings.count()
   print "predictions And Ratings above" 
#LINE63
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) **    2).reduce(add) / float(n))

model = ALS.train(training, rank, numIter, lambda)。 data 是验证数据集。 训练和验证集最初来自 rating.txt 文件,格式为:userID,productID,rating,ratingopID

这些是输出的一部分:

879
...
Rating(user=0, product=656, rating=4.122132631144641)
predictions above
...
1164
...
(u'640085', u'1590', u'5')
validation data above    
...
16/08/26 12:47:18 INFO DAGScheduler: Registering RDD 259 (join at     /path/myapp/MyappALS.py:56)
16/08/26 12:47:18 INFO DAGScheduler: Got job 20 (count at         /path/myapp/MyappALS.py:59) with 12 output partitions 
16/08/26 12:47:18 INFO DAGScheduler: Final stage: ResultStage 238 (count at /path/myapp/MyappALS.py:59)
16/08/26 12:47:18 INFO DAGScheduler: Parents of final stage:     List(ShuffleMapStage 237)
16/08/26 12:47:18 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 237)
16/08/26 12:47:18 INFO DAGScheduler: Submitting ShuffleMapStage 237     (PairwiseRDD[259] at join at /path/myapp/MyappALS.py:56), which has no     missing parents    
....

0
predictions And Ratings above

...
Traceback (most recent call last):
File "/path/myapp/MyappALS.py", line 130, in <module>
validationRmse = computeRmse(model, validation, numValidation)
File "/path/myapp/MyappALS.py", line 63, in computeRmse
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
File "/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 805, in reduce
ValueError: Can not reduce() empty RDD

所以从 count() 我确定初始 RDD 不是空的。 比INFO log Registering RDD 259 (join at /path/myapp/MyappALS.py:56) 是否意味着启动了join作业?

我错过了什么吗? 谢谢。

【问题讨论】:

你确定你的加入不会产生空集吗?加入将Return an RDD containing all pairs of elements with matching keys in self and other。并且示例显示它只会加入那些有key的人出现在两个集合中。 我设法解决了这个问题,并且函数 computeRMSE 确实给出了输出,但是在主程序中,我正在运行一个循环,在每次迭代中更改模型的参数,但它会因输出而崩溃第一次迭代后的内存错误! 【参考方案1】:

当我将 int() 添加到时,该错误消失了:

predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \ .join(data.map(lambda x: ((int(x[0]), int(x[1])), int(x[2])))) \ .values()

我们认为这是因为 pediction 是从给出元组的 predictAll 方法输出的,但其他数据是由算法手动解析的

【讨论】:

以上是关于PySpark 连接两个 RDD 导致一个空 RDD的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 PySpark 对两个 RDD 进行完全外连接?

如何在 PySpark 中将两个 rdd 合并为一个

在两个 Spark RDD(在 PySpark 中)上进行半连接的正确方法是啥?

pyspark RDD 将一行扩展为多行

Pyspark 将 rdd 转换为具有空值的数据帧

PySpark - RDD 中对象的时间重叠