如何从 DataFrame 中获取最后一行?

Posted

技术标签:

【中文标题】如何从 DataFrame 中获取最后一行?【英文标题】:How to get the last row from DataFrame? 【发布时间】:2017-07-31 02:42:50 【问题描述】:

我有一个DataFrame,DataFrame有两列'value'和'timestamp','timestmp'是有序的,我想得到DataFrame的最后一行,怎么办?

这是我的输入:

+-----+---------+
|value|timestamp|
+-----+---------+
|    1|        1|
|    4|        2|
|    3|        3|
|    2|        4|
|    5|        5|
|    7|        6|
|    3|        7|
|    5|        8|
|    4|        9|
|   18|       10|
+-----+---------+

这是我的代码:

    val arr = Array((1,1),(4,2),(3,3),(2,4),(5,5),(7,6),(3,7),(5,8),(4,9),(18,10))
    var df=m_sparkCtx.parallelize(arr).toDF("value","timestamp")

这是我的预期结果:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

【问题讨论】:

df.where($"timestamp" === max($"timestamp") 工作吗? 它不起作用 Exchange rangepartitioning(ts#7 ASC NULLS FIRST, 200) 【参考方案1】:

试试这个,它对我有用。

df.orderBy($"value".desc).show(1)

【讨论】:

【参考方案2】:

我会使用简单的查询 - 按降序排列您的餐桌 - 从这个订单中取第一个值

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY value DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

【讨论】:

我正在使用这个解决方案,这是显而易见的。【参考方案3】:

我只想reduce:

df.reduce  (x, y) => 
  if (x.getAs[Int]("timestamp") > y.getAs[Int]("timestamp")) x else y 

【讨论】:

【参考方案4】:

最有效的方法是reduce 你的DataFrame。这为您提供了一个单行,您可以将其转换回 DataFrame,但由于它仅包含 1 条记录,这没有多大意义。

sparkContext.parallelize(
  Seq(
  df.reduce 
    (a, b) => if (a.getAs[Int]("timestamp") > b.getAs[Int]("timestamp")) a else b 
    match case Row(value:Int,timestamp:Int) => (value,timestamp)
  )
)
.toDF("value","timestamp")
.show


+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

这个解决方案虽然更短,但效率较低(因为它需要改组):

df
.where($"timestamp" === df.groupBy().agg(max($"timestamp")).map(_.getInt(0)).collect.head)

【讨论】:

【参考方案5】:

如果您的时间戳列是唯一的并且是按递增顺序排列的,那么有以下方法可以获取最后一行

println(df.sort($"timestamp", $"timestamp".desc).first())

// Output [1,1]

df.sort($"timestamp", $"timestamp".desc).take(1).foreach(println)

// Output [1,1]

df.where($"timestamp" === df.count()).show

输出:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

如果不使用索引创建新列并选择最后一个索引,如下所示

val df1 = spark.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map 
  case (row, index) => Row.fromSeq(row.toSeq :+ index)
,
StructType(df.schema.fields :+ StructField("index", LongType, false)))

df1.where($"timestamp" === df.count()).drop("index").show

输出:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

【讨论】:

排序功能低效,我不想使用排序功能 你可以使用 df.where($"timestamp" === df.count())【参考方案6】:

Java:

Dataset<Row> sortDF = inputDF.orderBy(org.apache.spark.sql.functions.col(config.getIncrementingColumn()).desc());
Row row = sortDF.first()

【讨论】:

【参考方案7】:

你也可以使用这个功能desc:Column desc(String columnName)

df.orderBy(desc("value")).show(1)

给出相同的结果
df.orderBy($"value".desc).show(1)

【讨论】:

以上是关于如何从 DataFrame 中获取最后一行?的主要内容,如果未能解决你的问题,请参考以下文章

从最后一行熊猫数据框中获取第一个单元格[重复]

当行尾有多余空格时,将 .dat 文件转换为 DataFrame

更改 DataFrame 的最后一行中的元素

如何从SQL Server中的重复行中获取最后一行?

如何从 DataFrame 中保存一行

我在 Pandas 的 Dataframe 中添加了一行 3 次。但是,只添加了最后一行