PySpark.sql.filter 没有按应有的方式执行

Posted

技术标签:

【中文标题】PySpark.sql.filter 没有按应有的方式执行【英文标题】:PySpark.sql.filter not performing as it should 【发布时间】:2018-04-24 07:16:08 【问题描述】:

我在执行以下代码时遇到了问题:

from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext

hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")

当我运行上面的代码时,df3 是一个空的 DataFrame。然而: 如果我将代码更改为下面,它会给出正确的结果(2 行的 DataFrame):

from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext

hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1_temp = hc.createDataFrame(rows2)
df2 = df1_temp.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")

所以我的问题是:为什么我必须在这里创建一个临时数据框? 另外,如果我无法在我的项目部分中获取 HiveContext,我如何在现有数据框之上制作一个重复的数据框?

【问题讨论】:

期望的输出是什么? 什么版本的火花? 我猜你应该使用“alias()”。 赏金将奖励给提供相关 JIRA 票证参考的人。 这是我正在使用的 Spark 1.6 【参考方案1】:

我相信您在这里遇到的问题是一个更普遍的问题的实例,其中某些类型的 DataFrame 自连接(包括 DataFrame 与其自身过滤副本的连接)可能导致生成模棱两可或不正确查询计划。

有几个与此相关的 Spark JIRA;以下是一些值得注意的:

SPARK-15063: "filtering and joining back doesn't work" 似乎与您报告的特定实例类型最接近。 SPARK-17154: "Wrong result can be returned or AnalysisException can be thrown after self-join or similar operations" 对根本原因进行了很好的讨论。

还有其他 JIRA 票据处理这些问题的不同表现/方面。这些票证可以通过以下从上面列出的票证开始的 JIRA“相关”链接链来发现。

这种歧义只会在通过 DataFrame 实例引用列时出现(通过下标,如 df["mycol"],或通过字段访问,如 df.mycol)。可以通过给 DataFrame 设置别名并通过别名引用列来避免这种歧义。例如,以下内容可以正常工作:

>>> from pyspark.sql import functions as F
>>> df1 = hc.createDataFrame(rows1).alias("df1")
>>> df2 = df1.filter(F.col("id3")=="a").alias("df2")
>>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner")
>>> df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
|  4|  3|  b|  3|  2|  a|
|  3|  2|  a|  2|  1|  a|
+---+---+---+---+---+---+

【讨论】:

感谢您的回答。我怀疑可能是这种情况,但我仍然觉得它令人担忧。我可以接受相同的列连接可以变得微不足道(即使它不是我们所期望的),但是对于不同的表达式,它是令人不安的。但是提交者的回答超出了我的预期,所以再次感谢您,我将在今天晚些时候奖励赏金。【参考方案2】:

我在 Spark 2.0 中看到此数据集的行为相同,但并非总是针对相同的操作。稍微不同的数据框可以正常工作。

df1 = spark.createDataFrame(
    [(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']
    )
df1.show()

+---+---+---+
|id1|id2|id3|
+---+---+---+
|  1|  2|  a|
|  2|  2|  a|
|  3|  4|  b|
+---+---+---+

df2 = df1.filter(df1.id3 == 'a')
df2.show()

+---+---+---+
|id1|id2|id3|
+---+---+---+
|  1|  2|  a|
|  2|  2|  a|
+---+---+---+


df3 = df1.join(df2, df1.id2 == df2.id1, 'inner')
df3.show()

+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
|  2|  2|  a|  1|  2|  a|
|  2|  2|  a|  2|  2|  a|
+---+---+---+---+---+---+

一定有错误?不过,我还没有尝试过更高版本的 spark。您可能希望将此报告为错误。

【讨论】:

我猜你应该使用“alias()”。

以上是关于PySpark.sql.filter 没有按应有的方式执行的主要内容,如果未能解决你的问题,请参考以下文章

HTML 引导页脚未按应有的方式响应。它没有粘在底部,也没有像我编程的那样显示 100% 宽度

TableView 未按应有的方式显示

理解 Java 中的引用; BST 的 addNode() 函数未按应有的方式运行 [重复]

WaveFront .obj 加载器未按应有的方式显示对象(VBO 和 VAO)

jQuery 菜单没有关闭

复选框起作用,但没有动画