Spark Dataframe - 将特定行推到数据框中的最后一个

Posted

技术标签:

【中文标题】Spark Dataframe - 将特定行推到数据框中的最后一个【英文标题】:Spark Dataframe - Push a particular Row to the last in a Dataframe 【发布时间】:2018-08-27 03:20:26 【问题描述】:

一直在尝试将 Spark Dataframe 中的特定行推送到 Dataframe 的末尾。 这是我迄今为止尝试过的。

输入数据框:

+-------------+-------+------------+
|expected_date|count  |Downstream  |
+-------------+-------+------------+
|2018-08-26   |1      |abc         |
|2018-08-26   |6      |Grand Total |
|2018-08-26   |3      |xyy         |
|2018-08-26   |2      |xxx         |
+-------------+-------+------------+

代码:

    df.withColumn("Downstream_Hierarchy", when(col("Downstream") === "Grand Total", 2)
    .otherwise(1))
    .orderBy(col("Downstream_Hierarchy").asc)
    .drop("Downstream_Hierarchy")

输出数据框:

+-------------+-------+------------+
|expected_date|count  |Downstream  |
+-------------+-------+------------+
|2018-08-26   |1      |abc         |
|2018-08-26   |3      |xyy         |
|2018-08-26   |2      |xxx         |
|2018-08-26   |6      |Grand Total |
+-------------+-------+------------+

有没有更简单的方法来做到这一点?

【问题讨论】:

你的最终目标是什么? @Assaf Mendelson :我需要找到一些用例的总数并在邮件中发布。总计必须在最后一行,这样才能使数据更有意义。我也编辑了这个问题 - 请注意。 看起来比实际上的答案简单。 如果您通过电子邮件发送,我认为结果相对较小。这意味着您以某种方式将其翻译为“电子邮件”(例如,通过收集)。为什么不在那里对最后一行进行排序(即在数据框之外)? @AssafMendelson:错过了一点。实际上我不直接发送电子邮件。最终我将它写入HDFS。从那里,下游倾向于收集 part-00000 文件中的数据并通过电子邮件发送。无论我对你的第一个最终目标问题发表的评论都是高水平的。 【参考方案1】:

通过您的 cmets,由于 HDFS 需要最终结果,您可以将其作为 csv 写入 HDFS 两次

第一次将数据帧写入没有“总计”行的 hdfs。 第二次单独写入“总计”行,保存模式为“追加”。

【讨论】:

有道理!会试试的。【参考方案2】:

除所需行外的数据框:

val df1 = df.filter(col("Downstream") =!= "Grand Total" )

具有所需行的数据框:

val df2 = df.filter(col("Downstream") === "Grand Total" )

所需的数据帧:

val df_final = df1.union(df2)

可能不是最好的解决方案,但它避免昂贵的 OrderBy 操作。

【讨论】:

这并不能保证这将是最后一行。这将取决于分区的顺序 @AssafMendelson 没错!我对我当前的代码也有同样的疑问。尽管如此,在我将其写入 HDFS 之前,我已经完成了排序 - 我执行了 df.repartition(1)。话虽如此,它将改组数据帧不同分区中的数据,从而导致“总计”行移动到不同的位置。如果是,那么我需要寻找一种替代方法来将我的重新分区移动到不同的位置。 您可以尝试在执行过滤器之前执行 coalesce(1),假设所有聚合都预先完成,这应该给您一个单独的分区,但是,我不确定这是否仍能保证顺序。 我认为应该可以,请参考:***.com/a/29978189/7094520 分区只是堆叠,没有任何洗牌动作。【参考方案3】:

您可以尝试以下简单的步骤。

val lastRowDf = df.filter("Downstream='Grand Total'")
val remainDf = df.filter("Downstream !='Grand Total'")

remainDf.unionAll(lastRowDf).show

【讨论】:

以上是关于Spark Dataframe - 将特定行推到数据框中的最后一个的主要内容,如果未能解决你的问题,请参考以下文章

Spark Dataframe - 为特定 KEY 组的 VALUE 更改写入新记录

基于Spark Scala中的条件转置Dataframe中的特定列和行

如何根据另一列的值从 Spark DataFrame 中选择特定列?

在 Spark Dataframe (Pyspark) 中提取与特定条件匹配的第一个“行集”

Spark——DataFrame与RDD互操作方式

如何保证 Spark Dataframe 中的重新分区