spark dataframe 怎么去除第一行数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark dataframe 怎么去除第一行数据相关的知识,希望对你有一定的参考价值。

参考技术A 然后我们进入spark-shell,控制台的提示说明Spark为我们创建了一个叫sqlContext的上下文,注意,它是DataFrame的起点。
接下来我们希望把本地的JSON文件转化为DataFrame:
scala> val df = sqlContext.jsonFile("/path/to/your/jsonfile")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
从控制台的提示可以得知,我们成功创建了一个DataFrame的对象,包含age和name两个字段。
而DataFrame自带的玩法就多了:// 输出表结构df.printSchema()// 选择所有年龄大于21岁的人,只保留name字段df.filter(df("age") > 21).select("name").show()// 选择name,并把age字段自增df.select("name", df("age") + 1).show()// 按年龄分组计数df.groupBy("age").count().show()// 左联表(注意是3个等号!)df.join(df2, df("name") === df2("name"), "left").show()本回答被提问者采纳

如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?

【中文标题】如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?【英文标题】:How to create a new column in a Spark DataFrame based on a second DataFrame (Java)? 【发布时间】:2016-05-26 22:15:46 【问题描述】:

我有两个 Spark DataFrame,其中一个有两个 cols,id 和 Tag。第二个 DataFrame 有一个 id col,但缺少 Tag。第一个 Dataframe 本质上是一个字典,每个 id 出现一次,而在第二个 DataFrame 中,id 可能出现多次。我需要的是在第二个 DataFrame 中创建一个新的 col,它的 Tag 作为每行中 id 的函数(在第二个 DataFrame 中)。我认为这可以通过首先转换为 RDD ..etc 来完成,但我认为必须有一种更优雅的方式使用 DataFrames(在 Java 中)。示例:给定一个 df1 Row-> id: 0, Tag: "A", a df2 Row1-> id: 0, Tag: null, a df2 Row2-> id: 0, Tag: "B",我需要在结果 DataFrame df3 中创建一个 Tag col 等于 df1(id=0) = "A" IF df2 Tag 为空,但保持原始如果不为空则标记 => 导致 df3 Row1-> id: 0, Tag: "A", df3 Row2-> id: 0, Tag: "B"。希望这个例子很清楚。

|   ID  |   No.   |  Tag  | new Tag Col |
|    1  |  10002  |   A   |      A      |
|    2  |  10003  |   B   |      B      | 
|    1  |  10004  | null  |      A      |
|    2  |  10005  | null  |      B      |

【问题讨论】:

简单的LEFT OUTER JOIN 不适合你的任何原因? 我编辑了这个问题并将调查 LOJ.. LOJ 并没有完全解决它,但我认为如果我使用 udf() 跟随它,那么我会得到我需要的东西。谢谢, 如果还不够,您可以尝试使用示例输入和预期输出来更新问题。现在真的很难理解你想要什么。 当 Tag 为 null 时,new Tag = Tag(id)。例如。 Tag(id=1) = A,所以我们将 A 分配给 10004,将 Tag(id=2) = B 分配给 10005。我想我需要一个 udf()。 【参考方案1】:

这里只需要左外连接和coalesce:

import org.apache.spark.sql.functions.coalesce

val df = sc.parallelize(Seq(
  (1, 10002, Some("A")), (2, 10003, Some("B")),
  (1, 10004, None), (2, 10005, None)
)).toDF("id", "no", "tag")

val lookup = sc.parallelize(Seq(
  (1, "A"), (2, "B")
)).toDF("id", "tag")


df.join(lookup, df.col("id").equalTo(lookup.col("id")), "leftouter")
  .withColumn("new_tag", coalesce(df.col("tag"), lookup.col("tag")))

这应该与 Java 版本几乎相同。

【讨论】:

这在 Java 中不起作用。它期待 Seq.. 有什么帮助吗???

以上是关于spark dataframe 怎么去除第一行数据的主要内容,如果未能解决你的问题,请参考以下文章

以编程方式将几列添加到 Spark DataFrame

pandas dataframe 与 spark dataframe 互相转换(数据类型应该怎么转换呢?)

林子雨spark scala版编程小结

将 Spark Dataframe 直接写入 HIVE 需要太多时间

基于两列或多列的 Spark DataFrame 聚合

在 Spark Scala 中将一行从一个数据集添加到另一个数据集