Spark Hive:通过另一个 DataFrame 的列的值过滤一个 DataFrame 的行

Posted

技术标签:

【中文标题】Spark Hive:通过另一个 DataFrame 的列的值过滤一个 DataFrame 的行【英文标题】:Spark Hive: Filter rows of one DataFrame by the values of another DataFrame's column 【发布时间】:2017-03-15 13:37:50 【问题描述】:

我有以下两个DataFrames

DataFrame "dfPromotion":
date        | store
===================
2017-01-01  | 1    
2017-01-02  | 1


DataFrame "dfOther":
date        | store
===================
2017-01-01  | 1    
2017-01-03  | 1    

稍后我需要union 上面的DataFrames。但在我必须删除所有具有date 值的dfOther 行之前,它也包含在dfPromotion 中。

以下filtering 步骤的结果应如下所示:

DataFrame "dfPromotion" (this stays always the same, must not be changed in this step!)
date        | store
===================
2017-01-01  | 1    
2017-01-02  | 1


DataFrame "dfOther" (first row is removed as dfPromotion contains the date 2017-01-01 in the "date" column)
date        | store
===================
2017-01-03  | 1 

有没有办法在 Java 中做到这一点?到目前为止,我只找到了DataFrame.except 方法,但这会检查 DataFrames 的所有列。我需要仅通过date 列过滤第二个DataFrame,因为稍后可以添加其他列,其中可能包含不同的值...

调用dfOther.filter(dfOther.col("date").isin(dfPromotion.col("date"))) 会抛出以下异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) date#64 missing from date#0,store#13 in operator !Filter date#0 IN (date#64);

【问题讨论】:

【参考方案1】:

你可以使用减法,

dfOther.select("date").except(dfPromotion.select("date")).join(dfOther,'date').show()

【讨论】:

抱歉,Spark API 中没有 DataFrame.subtract 方法:spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/… 对不起。我在 pyspark 中使用了它,并在 Java rdd 中使用了减法。不知道它已为数据框删除。无论如何,我们有 except 函数,你可以在上面的代码行中使用它而不是减法。它应该主要工作。 except 函数的要点是,我必须仅通过 date 列过滤 dfOther DataFrame。所以我不能使用这种方法,否则这将是最简单的方法。 我相信我们可以,dfOther.select("date"),它返回一个只有日期列的数据框,同样也返回一个 dfPromotion.select("date")。因此,除了两个具有日期列的数据框之间将返回我们需要的结果。试一试,如果我错过了什么,请告诉我。 这就是你的做法,dfOther.select("date").except(dfPromotion.select("date")).join(dfOther,'date').show()跨度> 【参考方案2】:

既然你提到了 Spark Hive,你可以试试下面的 spark sql 方法吗?

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
val dfpromotion = sqlContext.sql("select * from dfpromotion");

dfpromotion.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-02|    1|
+----------+-----+

val dfother = sqlContext.sql("select * from dfother");

dfother.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-03|    1|
+----------+-----+


val dfdiff = sqlContext.sql("select o.dt, o.store from dfpromotion p right         outer join dfother o on p.dt = o.dt where p.dt is null");
val dfunion = dfpromotion.union(dfdiff);


scala> dfunion.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-02|    1|
|2017-01-03|    1|

【讨论】:

这是为我做的,非常感谢。我现在的最终代码:dfPromotion.join(dfOther, dfPromotion.col("date").equalTo(dfOther.col("date")), "right_outer").where(dfPromotion.col("date").isNull()).select(dfOther.col("date"), dfOther.col("store"));

以上是关于Spark Hive:通过另一个 DataFrame 的列的值过滤一个 DataFrame 的行的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL - 无法将所有记录写入配置单元表

通过 sparkSQL 进行 Hive 分桶

Spark - 结构值的 Hive 集成 - NULL 输出

黑猴子的家:Spark on hive 与 hive on spark 的区别

对 SparkSQL 中数组列的每个元素执行 UDF(需要另一个 spark 作业)

spark读取数据写入hive数据表