如何为第一个数据帧中匹配的特定列值的所有值获取第二个数据帧的数据?

Posted

技术标签:

【中文标题】如何为第一个数据帧中匹配的特定列值的所有值获取第二个数据帧的数据?【英文标题】:How to get data of second data frame for all values of particular columns values matched in first dataframe? 【发布时间】:2019-04-05 11:30:54 【问题描述】:

有如下两个数据框

first_df
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- min_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

second_df 
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

我在 second_df 中有一些公司数据。我需要从 second_df 中获取 first_df 中列出的那些公司 ID 的数据。

什么样的 spark api 对我有用? 我该怎么做?

谢谢。

问题扩展:

如果没有存储记录,则 first_df 将为空。因此 first_df("mean") & first_df("count") 将为空,导致 "acc_new_mean" 为空。在那种情况下,我需要将 "new_mean" 设置为 second_df("mean") ,该怎么做? 我试过这样但它不工作 任何线索如何在这里处理 .withColumn("new_mean", ... ) ???

val acc_new_mean = (second_df("mean") + first_df("mean")) / (second_df("count") + first_df("count"))
    val acc_new_count  =  second_df("count") + first_df("count")


    val new_df = second_df.join(first_df.withColumnRenamed("company_id", "right_company_id").as("a"), 
                                 (  $"a.right_company_id"  === second_df("company_id") && ( second_df("min_dd")  > $"a.max_dd" ) ) 
                            , "leftOuter")
                            .withColumn("new_mean", if(acc_new_mean == null) lit(second_df("mean")) else  acc_new_mean )

【问题讨论】:

@summerbulb 有什么帮助/建议吗? @dytyniak 有什么帮助/建议吗? @jezrael 有什么帮助/建议吗? 你能添加一些例子吗?看起来两个数据框都是相同的,这取决于你想要什么数据。您可以将这两个数据框加入或组合到联合中。 请提供数据示例以及您尝试过的内容。 【参考方案1】:

方法 1:

如果您发现难以使用数据框的连接 API 连接 2 个数据框,如果您熟悉 sql,则可以使用 sql。为此,您可以将 2 个数据帧注册为 spark 内存中的表,并在此之上写入 sql。

second_df.registerTempTable("table_second_df")
first_df.registerTempTable("table_first_df")

val new_df = spark.sql("select distinct s.* from table_second_df s join table_first_df f on s.company_id=f.company_id")
new_df.show()

按照您的要求,我已经添加了逻辑。

考虑您的first_df 如下所示:

+----------+----------+----------+----+-----+
|company_id|    max_dd|    min_dd|mean|count|
+----------+----------+----------+----+-----+
|         A|2019-04-05|2019-04-01|  10|  100|
|         A|2019-04-06|2019-04-02|  20|  200|
|         B|2019-04-08|2019-04-01|  30|  300|
|         B|2019-04-09|2019-04-02|  40|  400|
+----------+----------+----------+----+-----+

考虑您的second_df 如下所示:

+----------+----------+----+-----+
|company_id|    max_dd|mean|count|
+----------+----------+----+-----+
|         A|2019-04-03|  10|  100|
|         A|2019-04-02|  20|  200|
+----------+----------+----+-----+

由于第二个表中有公司 id A,我从second_df 中获取了最新的max_dd 记录。对于公司 id B,它不在second_df 我从first_df 获取了最新的max_dd 记录。

请在下面找到代码。

first_df.registerTempTable("table_first_df")
second_df.registerTempTable("table_second_df")
val new_df = spark.sql("select company_id,max_dd,min_dd,mean,count from (select distinct s.company_id,s.max_dd,null as min_dd,s.mean,s.count,row_number() over (partition by s.company_id order by s.max_dd desc) rno from table_second_df s join table_first_df f on s.company_id=f.company_id) where rno=1 union select company_id,max_dd,min_dd,mean,count from (select distinct f.*,row_number() over (partition by f.company_id order by f.max_dd desc) rno from table_first_df f left join table_second_df s  on s.company_id=f.company_id where s.company_id is null) where rno=1")
new_df.show()

结果如下:

方法 2:

您可以使用dataframe's API 的join,而不是像我在Approach 1 中提到的那样创建临时表。这与Approach 1 中的逻辑相同,但在这里我使用dataframe's API 来完成此操作。请不要忘记导入org.apache.spark.sql.expressions.Window,因为我在下面的代码中使用了Window.patitionBy

val new_df = second_df.as('s).join(first_df.as('f),$"s.company_id" === $"f.company_id","inner").drop($"min_dd").withColumn("min_dd",lit("")).select($"s.company_id", $"s.max_dd",$"min_dd", $"s.mean", $"s.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"s.company_id").orderBy($"s.max_dd".desc))).filter($"Rno" === 1).drop($"Rno").union(first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti").select($"f.company_id", $"f.max_dd",$"f.min_dd", $"f.mean", $"f.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"f.company_id").orderBy($"f.max_dd".desc))).filter($"Rno" === 1).drop($"Rno"))
new_df.show()

结果如下:

如果您有任何问题,请告诉我。

【讨论】:

感谢 Sarath ,但如果没有找到匹配项,我只需要从 first_df 中获取值......如果从 new_df 中找到,该怎么做?在火花 API 中 second_df 来自 Cassandra ,其中每个 company_id 可能有多个整数,我只需要选择存在的最新条目。如果没有条目,那么我需要获取 first_df 列值 谢谢,我可以知道这个“left_anti”是什么吗?在 first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti") 中? Left_anti 从 first_df 中选择那些公司 ID 不在 second_df 中的数据。如果解决方案有效,请将答案标记为已接受@Shyam 给出错误 org.apache.spark.sql.AnalysisException: cannot resolve 'f.min_dd' given input columns: [f.company_id, f.max_dd, f.count, f.mean];; 【参考方案2】:
 val acc_new_mean = //new mean literaal
 val acc_new_count  =   //new count literaal


          val resultDf = computed_df.join(accumulated_results_df.as("a"), 
                             (  $"company_id"  === computed_df("company_id")  ) 
                        , "leftOuter")
                        .withColumn("new_mean", when( acc_new_mean.isNull,lit(computed_df("mean")) ).otherwise(acc_new_mean) )
                        .withColumn("new_count", when( acc_new_count.isNull,lit(computed_df("count")) ).otherwise(acc_new_count) )
                         .select(
                            computed_df("company_id"),
                            computed_df("max_dd"),
                            col("new_mean").as("mean"),
                            col("new_count").as("count")
                          )

【讨论】:

以上是关于如何为第一个数据帧中匹配的特定列值的所有值获取第二个数据帧的数据?的主要内容,如果未能解决你的问题,请参考以下文章

SQL / Hive 选择具有特定列值的第一行

获取具有特定数量的重复值的行

对列值进行排序以匹配另一个表列中值的顺序

附加到数据帧中特定值的 for 循环中的向量

在数据库中搜索列中具有特定值的所有表

在数据库中搜索列中具有特定值的所有表