运行之间的 Spark DataFrame 行数不一致
Posted
技术标签:
【中文标题】运行之间的 Spark DataFrame 行数不一致【英文标题】:Spark DataFrame row count is inconsistent between runs 【发布时间】:2017-10-22 09:31:38 【问题描述】:当我在 EMR 上运行我的 spark 作业(版本 2.1.1)时,每次运行都会计算数据帧上不同数量的行。我首先从 s3 读取数据到 4 个不同的数据帧,这些计数总是一致的,然后在加入数据帧之后,加入的结果有不同的计数。之后我还过滤了结果,并且每次运行的计数也不同。变化很小,相差 1-5 行,但我仍然想了解。
这是连接的代码:
val impJoinKey = Seq("iid", "globalVisitorKey", "date")
val impressionsJoined: DataFrame = impressionDsNoDuplicates
.join(realUrlDSwithDatenoDuplicates, impJoinKey, "outer")
.join(impressionParamterDSwithDateNoDuplicates, impJoinKey, "left")
.join(chartSiteInstance, impJoinKey, "left")
.withColumn("timestamp", coalesce($"timestampImp", $"timestampReal", $"timestampParam"))
.withColumn("url", coalesce($"realUrl", $"url"))
这是用于过滤器的:
val impressionsJoined: Dataset[ImpressionJoined] = impressionsJoinedFullDay.where($"timestamp".geq(new Timestamp(start.getMillis))).cache()
我也尝试过使用过滤方法而不是 where,但结果相同
有什么想法吗?
谢谢 尼尔
【问题讨论】:
您能否详细说明“每次运行计算数据帧上不同数量的行”?来源是否一致,即 "read data from s3" 在运行之间是否给出相同的结果?您能否确保每个 “4 个不同数据帧” 的行数在运行之间保持一致?在读取数据帧之后和使用它们之前打印计数。 是的,从S3读取的DF中的行数在run之间总是一致的,只有join后的行数不一致 您能否显示所有代码 - 包括您所做的计数?另外,您的过滤器功能中使用的impressionsJoinedFullDay
是什么?这和impressionsJoined
有区别吗??
【参考方案1】:
其中一个数据源是否可能随时间而变化?
因为impressionsJoined
没有被缓存,spark 会在每个动作上从头开始重新评估它,包括从源再次读取数据。
尝试在加入后缓存impressionsJoined
。
【讨论】:
S3 上的数据没有改变,因为我可以看到打印出从 s3 读取的 DF 的计数永远不会改变。而且,DF 已经在过滤器之前缓存了 好吧,从技术上讲,读取的 DF 的相同计数只会告诉您运行之间的行数相同 - 它不能保证数据相同。如果您无法完全控制数据,请复制数据并将其存储在 s3 的其他位置。以上是关于运行之间的 Spark DataFrame 行数不一致的主要内容,如果未能解决你的问题,请参考以下文章