皮斯帕克。生成随机数的变压器总是生成相同的数字
Posted
技术标签:
【中文标题】皮斯帕克。生成随机数的变压器总是生成相同的数字【英文标题】:pyspark. Transformer that generates a random number generates always the same number 【发布时间】:2016-06-21 13:33:03 【问题描述】:我正在尝试衡量必须将 dataframe
从 scala 复制到 python 并返回到大型管道中的性能影响。为此,我创建了这个相当人工的转换器:
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import random
class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
super(RandomColAdderTransformer, self).__init__()
self.bogusarg = None
self._setDefault(bogusarg=set())
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
cur_col = self.getInputCol()
def randGet(col): # UDF crashes with no arguments
a = col*random.random() # Ensure we are reading and copying to python space
return a # It runs only once?
sparktype = FloatType()
return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))
这个转换器的目标是确保有一些从 python 生成的数字,它访问dataframe
并进行乘法(在 python 中),然后对于管道的下一个阶段,它必须添加dataframe
的一列
但是我有些奇怪。在测试我的代码时,会为所有列生成相同的随机数:
df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
myTestTransformer.setInputCol("x3")
transformedDF = myTestTransformer.transform(df)
transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
然后transformedDF.show()
的连续调用实际上改变了值!?
transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [3]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 2.9191132|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [4]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [5]: transformedDF.show()
+---+---+-----+----------+
| x1| x2| x3| randFloat|
+---+---+-----+----------+
| 1| a| 23.0| 16.033003|
| 3| B|-23.0|-2.9191132|
+---+---+-----+----------+
这是预期的行为吗? .show()
真的会触发计算开始吗? AFAIK 我应该使用单个节点,确定它们会在单个线程中运行,以便共享随机种子吗?我知道存在一个内置的 pyspark rng
,但它不适合我的目的,因为它实际上不会从 python 空间生成数据。
【问题讨论】:
您希望在这里得到什么样的答案?解释给定代码发生了什么或完整的工作替代? 解释发生了什么。 AFAIK 代码执行我希望它执行的操作(python 空间中的值乘以dataframe
中的值,然后将该列附加到 dataframe
以进一步处理它。如果我错了,请纠正我,我我试图了解正在发生的事情并确保它会强制 py4j 将数据复制到 jvm 并返回。
您可以使用身份 (lambda x: x
),如果您的唯一目标是移动数据,它也可以正常工作。
@zero323 我不确定这是否真的会做任何事情,因为我认为评估是按需进行的,但感谢您的解释。
如果下游处理需要它,它将独立于版本执行。在 1.x 中,即使根本不使用它也会被计算出来。一般来说,您不仅应该考虑数据移动成本。对 GC 也有重大影响。更不用说 Python UDF 对执行计划特别不利。
【参考方案1】:
好吧,这里的预期是相对的,但不是无法解释的。特别是 RNG 的状态是从父进程继承的。您可以通过在本地模式下运行以下简单的 sn-p 轻松证明这一点:
import random
def roll_and_get_state(*args):
random.random()
return [random.getstate()]
states = sc.parallelize([], 10).mapPartitions(roll_and_get_state).collect()
len(set(states))
## 1
如您所见,每个分区都使用自己的 RNG,但都具有相同的状态。
通常,确保在 Spark 中正确的 Python RNG 行为而不会造成严重的性能损失,尤其是在您需要可重现的结果时,这是相当棘手的。
一种可能的方法是使用加密安全随机数据 (os.urandom
) 生成的种子为每个分区实例化单独的 Random
实例。
如果您需要可重现的结果,您可以根据全局状态和分区数据生成 RNG 种子。不幸的是,这些信息在 Python 运行时不容易访问(忽略 mapPartitionsWithIndex
等特殊情况)。
由于分区级别的操作并不总是适用(例如在 UDF 的情况下),您可以通过使用单例模块或 Borg 模式为每个执行程序初始化 RNG 来获得类似的结果。
另见:
Random numbers generation in PySpark Filtering Spark DataFrame on new column【讨论】:
感谢您的详细解答。 不客气。如果有帮助请不要忘记点赞/接受:) 你能告诉我为什么show()
调用后来有时会产生不同的结果,有时会产生相同的结果吗?数据框不是应该已经转换了吗?
如果数据没有被缓存,那么每次你执行一个动作时一切都是从头开始计算的。另外,据我所知,show 返回任意行集。
在这种情况下,我只有 2 行。我会理解它每次都会重新计算,但为什么有时会得到不同的随机数?以上是关于皮斯帕克。生成随机数的变压器总是生成相同的数字的主要内容,如果未能解决你的问题,请参考以下文章