运行之间的 Spark DataFrame 行数不一致

Posted

技术标签:

【中文标题】运行之间的 Spark DataFrame 行数不一致【英文标题】:Spark DataFrame row count is inconsistent between runs 【发布时间】:2017-10-22 09:31:38 【问题描述】:

当我在 EMR 上运行我的 spark 作业(版本 2.1.1)时,每次运行都会计算数据帧上不同数量的行。我首先从 s3 读取数据到 4 个不同的数据帧,这些计数总是一致的,然后在加入数据帧之后,加入的结果有不同的计数。之后我还过滤了结果,并且每次运行的计数也不同。变化很小,相差 1-5 行,但我仍然想了解。

这是连接的代码:

val impJoinKey = Seq("iid", "globalVisitorKey", "date")

val impressionsJoined: DataFrame = impressionDsNoDuplicates
  .join(realUrlDSwithDatenoDuplicates, impJoinKey, "outer")
  .join(impressionParamterDSwithDateNoDuplicates, impJoinKey, "left")
  .join(chartSiteInstance, impJoinKey, "left")
  .withColumn("timestamp", coalesce($"timestampImp", $"timestampReal", $"timestampParam"))
  .withColumn("url", coalesce($"realUrl", $"url"))

这是用于过滤器的:

val impressionsJoined: Dataset[ImpressionJoined] = impressionsJoinedFullDay.where($"timestamp".geq(new Timestamp(start.getMillis))).cache()

我也尝试过使用过滤方法而不是 where,但结果相同

有什么想法吗?

谢谢 尼尔

【问题讨论】:

您能否详细说明“每次运行计算数据帧上不同数量的行”?来源是否一致,即 "read data from s3" 在运行之间是否给出相同的结果?您能否确保每个 “4 个不同数据帧” 的行数在运行之间保持一致?在读取数据帧之后和使用它们之前打印计数。 是的,从S3读取的DF中的行数在run之间总是一致的,只有join后的行数不一致 您能否显示所有代码 - 包括您所做的计数?另外,您的过滤器功能中使用的impressionsJoinedFullDay 是什么?这和impressionsJoined 有区别吗?? 【参考方案1】:

其中一个数据源是否可能随时间而变化? 因为impressionsJoined 没有被缓存,spark 会在每个动作上从头开始重新评估它,包括从源再次读取数据。

尝试在加入后缓存impressionsJoined

【讨论】:

S3 上的数据没有改变,因为我可以看到打印出从 s3 读取的 DF 的计数永远不会改变。而且,DF 已经在过滤器之前缓存了 好吧,从技术上讲,读取的 DF 的相同计数只会告诉您运行之间的行数相同 - 它不能保证数据相同。如果您无法完全控制数据,请复制数据并将其存储在 s3 的其他位置。

以上是关于运行之间的 Spark DataFrame 行数不一致的主要内容,如果未能解决你的问题,请参考以下文章

spark dataframe函数编程

「日志」Navicat统计的行数竟然和表实际行数不一致

spark dataFrame 相关知识点

在 spark.SQL DataFrame 和 pandas DataFrame 之间转换 [重复]

在不计算的情况下获取 Spark 数据框中的行数

Spark中的DataFrame,Dataset和RDD之间的区别