理解 pyspark 中的惰性求值行为

Posted

技术标签:

【中文标题】理解 pyspark 中的惰性求值行为【英文标题】:Understanding Lazy evaluation behavior in pyspark 【发布时间】:2019-11-25 18:08:36 【问题描述】:

我有一个类似这样的 spark 数据框。

df.show()

| Id |
|----|
| 1  |
| 2  |
| 3  |

现在我想添加几列分配给它们的随机整数。我正在使用以下 udf 。 (我知道我们不需要为此使用 udf)。这是我的代码。

random_udf = udf(lambda: random.randint(0, 1000), IntegerType())

df = df.withColumn("test_int", random_udf())
df.show()
| Id | test_int |
|----|----------|
| 1  | 51       |
| 2  | 111      |
| 3  | 552      |

现在,如果我添加另一列并显示它。 'test_int' 列中的值正在改变。

df = df.withColumn("test_int1", random_udf())
df.show()
| Id | test_int | test_int1 |
|----|----------|-----------|
| 1  | 429      | 429       |
| 2  | 307      | 307       |
| 3  | 69       | 69        |

我意识到可能是 spark 在第二个显示语句中再次评估数据帧,并将持久语句添加到我的代码中。现在我的代码看起来像这样。

df = df.withColumn("test_int", random_udf()).persist()
df.rdd.count()  ## To kick off the evaluation
df.show()
| Id | test_int |
|----|----------|
| 1  | 459      |
| 2  | 552      |
| 3  | 89       |

df = df.withColumn("test_int1", random_udf())
df.show()
| Id | test_int | test_int1 |
|----|----------|-----------|
| 1  | 459      | 459       |
| 2  | 552      | 552       |
| 3  | 89       | 89        |

无论我做什么,两列似乎都具有相同的价值。我正在寻找这种行为的解释。我正在使用 Azure databricks notebook (Pyspark 2.4.4)。

【问题讨论】:

你试过用random_udf.asNondeterministic()改变你的UDF吗? 【参考方案1】:

这里有两点:

    您需要了解计算机并不是真正随机数字。这里发生的是为您的random_udf() 设置了seed - 一旦设置了这个种子,“随机”将一次又一次地重复,因为你要求它做同样的事情。在数据科学中,这非常重要,因为它允许确定性并允许您的实验可重复。有关详细信息,请参阅 numpy.random.seed (https://docs.scipy.org/doc/numpy-1.15.0/reference/generated/numpy.random.seed.html) 和 random.seed

    你不应该真的使用udf 来处理这样的事情。有一个非常好的(和并行的)pyspark.sql.functions.rand,它允许你设置一个seed。见这里:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.rand

【讨论】:

以上是关于理解 pyspark 中的惰性求值行为的主要内容,如果未能解决你的问题,请参考以下文章

惰性求值——lodash源码解读

Python中的惰性求值

从属属性中的 MATLAB 惰性求值

Python:any() / all() 中的惰性函数求值

Swift 之惰性求值

奇怪的 jags.parallel 错误/避免函数调用中的惰性求值