Spark Dataframe 除了方法问题

Posted

技术标签:

【中文标题】Spark Dataframe 除了方法问题【英文标题】:Spark Dataframe except method Issue 【发布时间】:2018-03-09 07:14:41 【问题描述】:

我有一个减去两个数据帧的用例。所以我使用了数据框 except() 方法。

这在较小的数据集上本地运行良好。

但是当我运行 AWS S3 存储桶时,except() 方法并没有像预期的那样减去。分布式环境有什么需要注意的吗?

有人遇到过类似的问题吗?

这是我的示例代码

val values = List(List("One", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "A", "Yes") 
  , List("Two", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "X", "No") 
  , List("Three", "2017-07-09T23:59:59.000", "2017-12-05T23:59:58.000", "M", "Yes") 
  , List("Four", "2017-11-01T23:59:59.000", "2017-12-09T23:59:58.000", "A", "No") 
  , List("Five", "2017-07-09T23:59:59.000", "2017-12-05T23:59:58.000", "", "No") 
  ,List("One", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "", "No")
)
  .map(row => (row(0), row(1), row(2), row(3), row(4)))

val spark = SparkSession.builder().master("local").getOrCreate()

import spark.implicits._

val df = values.toDF("KEY", "ROW_START_DATE", "ROW_END_DATE", "CODE", "Indicator")

val filterCond = (col("ROW_START_DATE") <= "2017-10-31T23:59:59.999" && col("ROW_END_DATE") >= "2017-10-31T23:59:59.999" && col("CODE").isin("M", "A", "R", "G"))


val Filtered = df.filter(filterCond)
val Excluded = df.except(df.filter(filterCond))

预期输出:

df.show(false)
Filtered.show(false)
Excluded.show(false)
+-----+-----------------------+-----------------------+----+---------+
|KEY  |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A   |Yes      |
|Two  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X   |No       |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M   |Yes      |
|Four |2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A   |No       |
|Five |2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|    |No       |
|One  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|    |No       |
+-----+-----------------------+-----------------------+----+---------+
+-----+-----------------------+-----------------------+----+---------+
|KEY  |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A   |Yes      |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M   |Yes      |
+-----+-----------------------+-----------------------+----+---------+
+----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+----+-----------------------+-----------------------+----+---------+
|Four|2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A   |No       |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X   |No       |
|Five|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|    |No       |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|    |No       |
+----+-----------------------+-----------------------+----+---------+

但是在 S3 存储桶上运行时会得到类似下面的东西

Filtered.show(false)
+-----+-----------------------+-----------------------+----+---------+
|KEY  |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A   |Yes      |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M   |Yes      |
+-----+-----------------------+-----------------------+----+---------+

Excluded.show(false)

+----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A   |Yes      |---> wrong
|Four|2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A   |No       |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X   |No       |
|Five|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|    |No       |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|    |No       |
+----+-----------------------+-----------------------+----+---------+

还有其他方法可以减去两个 spark 数据帧吗?

【问题讨论】:

s3 存储桶中的Filtered.show(false) 是什么? Filtered.show(false) 在两种环境中的工作方式相同。更新了 S3 输出 您尝试使用val Excluded = df.except(df.filter(filterCond)),为什么不尝试使用val Excluded = df.except(Filtered) 有什么不同吗? 我还没有找到任何解决方案,所以我改变了我的编程功能。不知道为什么会出现这个问题,可能是因为 spark 的分布式处理特性。 【参考方案1】:

可以根据两个数据帧的唯一性在两个数据帧上使用 leftanti 连接,这将为您提供您期望从 except 操作中获得的输出。

val diffdf = df1.join(df2,Seq("uniquekey"),"leftanti")

【讨论】:

@nikunj-kakadiyakunj 我同意这种使用“反连接”的方法可以帮助我们实现我们的要求。但是知道为什么 except 方法不起作用吗?理想情况下,它应该给出相同的结果。 @boom_clap 从理论上讲,它应该可以工作,但数据值可能有些奇怪,或者可能是由于在没有采取行动的情况下应用了转换,但我没有将其与问题中的测试数据一起检查。 【参考方案2】:

S3 并不完全是一个文件系统,it can surface in spark

    尝试验证写入 s3 的数据是否与使用 file://dest 时获得的数据相同。因为有东西在途中丢失的风险。 然后尝试在写入 s3 和读取之间放置一个 Thread.sleep(10000);这将显示目录不一致是否浮出水面。 如果您使用的是 EMR,请尝试使用他们一致的 EMR 选项 并尝试使用 s3a:// 连接器

如果它不能与 s3a:// 一起工作,请在 issues.apache.org 上提交 SPARK-JIRA,也将 s3a 放入文本中,包括此代码 sn-p(隐含地将其授权给 ASF)。然后我可以将它复制到一个测试中,看看我是否可以看到它,如果可以,当我在 Hadoop 3.1+ 中打开 s3guard 时它是否会消失

【讨论】:

以上是关于Spark Dataframe 除了方法问题的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 中的 DataFrame 相等性

获取 Spark Dataframe 中特定单元格的值

单元测试火花数据帧转换链接

如何在 pyspark 中将 DenseMatrix 转换为 spark DataFrame?

Spark DataFrame 提示函数的可能值是啥?

Spark SQL 中 dataFrame 学习总结