PySpark 评估

Posted

技术标签:

【中文标题】PySpark 评估【英文标题】:PySpark Evaluation 【发布时间】:2016-06-28 18:46:24 【问题描述】:

我正在尝试以下代码,它为 RDD 中的每一行添加一个数字,并使用 PySpark 返回一个 RDD 列表。

from pyspark.context import SparkContext
file  = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file) 
splits = [data.map(lambda p :  int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()

输入文件(sample.txt)中的内容是:

1
2
3

我期待这样的输出(将 rdd 中的数字分别与 0、1、2 相加):

[1,2,3]
[2,3,4]
[3,4,5]

而实际输出是:

[4, 5, 6]
[4, 5, 6]
[4, 5, 6]

这意味着理解只使用变量 i 的值 3,而与 range(4) 无关。

为什么会发生这种行为?

【问题讨论】:

【参考方案1】:

这是因为 Python 后期绑定而发生的,而不是 (Py)Spark 特定的。 i 将在使用 lambda p : int(p) + i 时查找,而不是在定义时查找。通常它意味着当它被调用但在这个特定的上下文中它是当它被序列化以发送给工作人员时。

例如,您可以这样做:

def f(i):
    def _f(x):
        try:
            return int(x) + i
        except:
            pass
    return _f

data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]

【讨论】:

我曾尝试将“p”传递给一个简单的外部函数,并传递给一个通过 lambda 调用的内部函数(如答案中的函数),用于反复试验。注意到正确的行为,当我这样做时:pastebin.com/z7E7wGKx 感谢您回复发生这种情况的原因。 值得注意的是,这发生在几乎任何带有闭包/lambdas 的语言中,甚至是 C#【参考方案2】:

这是因为 lambda 是通过引用来引用 i 的!它与火花无关。 See this

你可以试试这个:

a =[(lambda y: (lambda x: y + int(x)))(i) for i in range(4)]
splits = [data.map(a[x]) for x in range(4)]

或一行

splits = [
    data.map([(lambda y: (lambda x: y + int(x)))(i) for i in range(4)][x])
    for x in range(4)
]

【讨论】:

如果你想使用lambdas,有一个避免嵌套的简单技巧:[lambda x, i=i: i + int(x) for i in range(4)]

以上是关于PySpark 评估的主要内容,如果未能解决你的问题,请参考以下文章

为 pyspark 数据帧的每一行评估多个 if elif 条件

pyspark_mllib_classifier—(SVM)

Pyspark - df.cache().count() 永远运行

通过 pyspark.ml.tuning.TrainValidationSplit 调整后如何获得最佳参数?

Pyspark:将UDF的结果迭代地写回数据框不会产生预期的结果

pyspark GBTRegressor 特征重要度 及排序