火花可重复/确定性结果

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了火花可重复/确定性结果相关的知识,希望对你有一定的参考价值。

我正在运行下面的Spark代码(基本上是作为MVE创建的),它执行以下操作:

  1. 阅读实木复合地板和限额
  2. 分区依据
  3. 加入
  4. 过滤器

我正在努力理解为什么我在joined数据框中得到不同数量的行,即每次运行应用程序后,在第3阶段之后的数据框中。为什么会这样?

我认为不应该发生的原因是limit是确定性的,因此每次相同的行应位于分区数据帧中,尽管顺序不同。在联接中,我将联接完成分区的字段。我期望分区中具有成对的每种组合,但是我认为每次都应等于相同的数量。

 def main(args: Array[String]) 

    val maxRows = args(0)

    val spark = SparkSession.builder.getOrCreate()
    val windowSpec = Window.partitionBy("epoch_1min").orderBy("epoch")

    val data = spark.read.parquet("srcfile.parquet").limit(maxRows.toInt)
    val partitionDf = data.withColumn("row", row_number().over(windowSpec))
    partitionDf.persist(StorageLevel.MEMORY_ONLY)
    logger.debug(s"$partitionDf.count() rows in partitioned data")

    val dfOrig = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_orig").withColumnRenamed("row", "row_orig")
    val dfDest = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_dest").withColumnRenamed("row", "row_dest")

    val joined = dfOrig.join(dfDest, dfOrig("epoch_1min_orig") === dfDest("epoch_1min_dest"), "inner")
    logger.debug(s"Rows in joined dataframe $joined.count()")

    val filtered = joined.filter(col("row_orig") < col("row_dest"))
    logger.debug(s"Rows in filtered dataframe $filtered.count()")

  
答案
  1. 如果启动新的应用程序,可能会发生基础数据更改。
  2. [否则,就像在RDBMS上使用ANSI SQL一样使用Spark SQL,当不使用ORDER BY时,不能保证数据的顺序。因此,您不能以不同的Executor评估假定处理将是相同的(无排序/排序)。

以上是关于火花可重复/确定性结果的主要内容,如果未能解决你的问题,请参考以下文章

可重复使用的Pytorch结果和随机种子

精心收集的 48 个 JavaScript 代码片段,仅需 30 秒就可理解!(转载)

在火花对 RDD 中按值排序

如何在火花中使用地图而不实现可序列化?

在火花中压缩 2 列 [重复]

根据日期范围过滤火花数据框[重复]