PySpark.sql.filter 没有按应有的方式执行
Posted
技术标签:
【中文标题】PySpark.sql.filter 没有按应有的方式执行【英文标题】:PySpark.sql.filter not performing as it should 【发布时间】:2018-04-24 07:16:08 【问题描述】:我在执行以下代码时遇到了问题:
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
当我运行上面的代码时,df3 是一个空的 DataFrame。然而: 如果我将代码更改为下面,它会给出正确的结果(2 行的 DataFrame):
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1_temp = hc.createDataFrame(rows2)
df2 = df1_temp.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
所以我的问题是:为什么我必须在这里创建一个临时数据框? 另外,如果我无法在我的项目部分中获取 HiveContext,我如何在现有数据框之上制作一个重复的数据框?
【问题讨论】:
期望的输出是什么? 什么版本的火花? 我猜你应该使用“alias()”。 赏金将奖励给提供相关 JIRA 票证参考的人。 这是我正在使用的 Spark 1.6 【参考方案1】:我相信您在这里遇到的问题是一个更普遍的问题的实例,其中某些类型的 DataFrame 自连接(包括 DataFrame 与其自身过滤副本的连接)可能导致生成模棱两可或不正确查询计划。
有几个与此相关的 Spark JIRA;以下是一些值得注意的:
SPARK-15063: "filtering and joining back doesn't work" 似乎与您报告的特定实例类型最接近。 SPARK-17154: "Wrong result can be returned or AnalysisException can be thrown after self-join or similar operations" 对根本原因进行了很好的讨论。还有其他 JIRA 票据处理这些问题的不同表现/方面。这些票证可以通过以下从上面列出的票证开始的 JIRA“相关”链接链来发现。
这种歧义只会在通过 DataFrame 实例引用列时出现(通过下标,如 df["mycol"]
,或通过字段访问,如 df.mycol
)。可以通过给 DataFrame 设置别名并通过别名引用列来避免这种歧义。例如,以下内容可以正常工作:
>>> from pyspark.sql import functions as F
>>> df1 = hc.createDataFrame(rows1).alias("df1")
>>> df2 = df1.filter(F.col("id3")=="a").alias("df2")
>>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner")
>>> df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
| 4| 3| b| 3| 2| a|
| 3| 2| a| 2| 1| a|
+---+---+---+---+---+---+
【讨论】:
感谢您的回答。我怀疑可能是这种情况,但我仍然觉得它令人担忧。我可以接受相同的列连接可以变得微不足道(即使它不是我们所期望的),但是对于不同的表达式,它是令人不安的。但是提交者的回答超出了我的预期,所以再次感谢您,我将在今天晚些时候奖励赏金。【参考方案2】:我在 Spark 2.0 中看到此数据集的行为相同,但并非总是针对相同的操作。稍微不同的数据框可以正常工作。
df1 = spark.createDataFrame(
[(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']
)
df1.show()
+---+---+---+
|id1|id2|id3|
+---+---+---+
| 1| 2| a|
| 2| 2| a|
| 3| 4| b|
+---+---+---+
df2 = df1.filter(df1.id3 == 'a')
df2.show()
+---+---+---+
|id1|id2|id3|
+---+---+---+
| 1| 2| a|
| 2| 2| a|
+---+---+---+
df3 = df1.join(df2, df1.id2 == df2.id1, 'inner')
df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
| 2| 2| a| 1| 2| a|
| 2| 2| a| 2| 2| a|
+---+---+---+---+---+---+
一定有错误?不过,我还没有尝试过更高版本的 spark。您可能希望将此报告为错误。
【讨论】:
我猜你应该使用“alias()”。以上是关于PySpark.sql.filter 没有按应有的方式执行的主要内容,如果未能解决你的问题,请参考以下文章
HTML 引导页脚未按应有的方式响应。它没有粘在底部,也没有像我编程的那样显示 100% 宽度
理解 Java 中的引用; BST 的 addNode() 函数未按应有的方式运行 [重复]