皮斯帕克。生成随机数的变压器总是生成相同的数字

Posted

技术标签:

【中文标题】皮斯帕克。生成随机数的变压器总是生成相同的数字【英文标题】:pyspark. Transformer that generates a random number generates always the same number 【发布时间】:2016-06-21 13:33:03 【问题描述】:

我正在尝试衡量必须将 dataframe 从 scala 复制到 python 并返回到大型管道中的性能影响。为此,我创建了这个相当人工的转换器:

from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

import random

class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
        super(RandomColAdderTransformer, self).__init__()
        self.bogusarg = None
        self._setDefault(bogusarg=set())
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        cur_col = self.getInputCol()
        def randGet(col): # UDF crashes with no arguments
            a = col*random.random() # Ensure we are reading and copying to python space 
            return a            # It runs only once?

        sparktype = FloatType()
        return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))

这个转换器的目标是确保有一些从 python 生成的数字,它访问dataframe 并进行乘法(在 python 中),然后对于管道的下一个阶段,它必须添加dataframe的一列

但是我有些奇怪。在测试我的代码时,会为所有列生成相同的随机数:

df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
myTestTransformer.setInputCol("x3")
transformedDF = myTestTransformer.transform(df)
transformedDF.show()

+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+

然后transformedDF.show() 的连续调用实际上改变了值!?

transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [3]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0|  2.9191132|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [4]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [5]: transformedDF.show()
+---+---+-----+----------+
| x1| x2|   x3| randFloat|
+---+---+-----+----------+
|  1|  a| 23.0| 16.033003|
|  3|  B|-23.0|-2.9191132|
+---+---+-----+----------+

这是预期的行为吗? .show() 真的会触发计算开始吗? AFAIK 我应该使用单个节点,确定它们会在单个线程中运行,以便共享随机种子吗?我知道存在一个内置的 pyspark rng,但它不适合我的目的,因为它实际上不会从 python 空间生成数据。

【问题讨论】:

您希望在这里得到什么样的答案?解释给定代码发生了什么或完整的工作替代? 解释发生了什么。 AFAIK 代码执行我希望它执行的操作(python 空间中的值乘以 dataframe 中的值,然后将该列附加到 dataframe 以进一步处理它。如果我错了,请纠正我,我我试图了解正在发生的事情并确保它会强制 py4j 将数据复制到 jvm 并返回。 您可以使用身份 (lambda x: x),如果您的唯一目标是移动数据,它也可以正常工作。 @zero323 我不确定这是否真的会做任何事情,因为我认为评估是按需进行的,但感谢您的解释。 如果下游处理需要它,它将独立于版本执行。在 1.x 中,即使根本不使用它也会被计算出来。一般来说,您不仅应该考虑数据移动成本。对 GC 也有重大影响。更不用说 Python UDF 对执行计划特别不利。 【参考方案1】:

好吧,这里的预期是相对的,但不是无法解释的。特别是 RNG 的状态是从父进程继承的。您可以通过在本地模式下运行以下简单的 sn-p 轻松证明这一点:

import random 

def roll_and_get_state(*args):
    random.random()
    return [random.getstate()]

states = sc.parallelize([], 10).mapPartitions(roll_and_get_state).collect()
len(set(states))
## 1

如您所见,每个分区都使用自己的 RNG,但都具有相同的状态。

通常,确保在 Spark 中正确的 Python RNG 行为而不会造成严重的性能损失,尤其是在您需要可重现的结果时,这是相当棘手的。

一种可能的方法是使用加密安全随机数据 (os.urandom) 生成的种子为每个分区实例化单独的 Random 实例。

如果您需要可重现的结果,您可以根据全局状态和分区数据生成 RNG 种子。不幸的是,这些信息在 Python 运行时不容易访问(忽略 mapPartitionsWithIndex 等特殊情况)。

由于分区级别的操作并不总是适用(例如在 UDF 的情况下),您可以通过使用单例模块或 Borg 模式为每个执行程序初始化 RNG 来获得类似的结果。

另见:

Random numbers generation in PySpark Filtering Spark DataFrame on new column

【讨论】:

感谢您的详细解答。 不客气。如果有帮助请不要忘记点赞/接受:) 你能告诉我为什么show() 调用后来有时会产生不同的结果,有时会产生相同的结果吗?数据框不是应该已经转换了吗? 如果数据没有被缓存,那么每次你执行一个动作时一切都是从头开始计算的。另外,据我所知,show 返回任意行集。 在这种情况下,我只有 2 行。我会理解它每次都会重新计算,但为什么有时会得到不同的随机数?

以上是关于皮斯帕克。生成随机数的变压器总是生成相同的数字的主要内容,如果未能解决你的问题,请参考以下文章

为什么这个随机数生成器生成相同的数字?

无法使用 keras.models.load_model() 加载 TF 变压器模型

Pyspark:保存变压器

cadence画单端输入多端输出变压器的电路原理图如何画

Laravel 变压器与资源

变压器模型预测的意外结果