Apache Spark 在 DataFrame 中插入多行

Posted

技术标签:

【中文标题】Apache Spark 在 DataFrame 中插入多行【英文标题】:Apache Spark insert multiple rows into the DataFrame 【发布时间】:2021-01-12 13:34:51 【问题描述】:

首先我绑定到Java 1.7Java Spark 1.6

我有很多列和数据,但让我们按照这个简单的例子。 所以假设我有一个简单的表(DataFrame)

+----+-------+
|  id|   name|
+----+-------+
|   1|      A|
+----+-------+
|   2|      B|
+----+-------+
|   3|      C|
+----+-------+

每次在每个单元格上,我都会调用自定义 udf 函数来进行所需的计算。要求之一是每次在每一行之后(或在具有某种值的每一行之后)创建并附加新的 N 行。

所以,就像:

+----+-------+
|  id|   name|
+----+-------+
|   1|      A| --> create 1 new Row (based on the udf calculations)
+----+-------+
|   2|      B| --> create 2 new Rows (based on the udf calculations)
+----+-------+
|   3|      C|
+----+-------+

预期结果是:

+----+-------+
|  id|   name|
+----+-------+
|   1|      A|
+----+-------+
|    |  (new)|
+----+-------+
|   2|      B|
+----+-------+
|    |  (new)|
+----+-------+
|    |  (new)|
+----+-------+
|   3|      C|
+----+-------+

我的误解 - 最好/正确的方法是什么? 我当前面临的问题:通过dataFrame.foreach(new Function1<Row, BoxedUnit>() ...) Serializable error. 就我个人而言,我不确定foreach 是不是最好的方法,但我必须以某种方式迭代当前的数据帧。

此外,如果我做对了,我将始终申请 unionAll 来追加新行。 也许还有其他更好的方法可以通过Spark Sql 或将其转换为RDD 等来做到这一点。

【问题讨论】:

我建议从 UDF 返回一个数组,然后将数组分解为多行 是的,在 UDF 中,我会将计算结果保存到 temp 列中,以便在迭代期间获取它。但是对于我来说,迭代本身仍然不清楚。谢谢。 不需要迭代。只需做类似df.select(col("id"),col("name"),explode(my_udf("id","name"))) 【参考方案1】:

回答我自己的问题(感谢@mck 提供explode() 的想法)

所以,假设最初的 df 是:

DataFrame baseDf = ...

+----+-------+
|  id|   name|
+----+-------+
|   1|      A|
+----+-------+
|   2|      B|
+----+-------+

为 UDF 结果创建新的 'temp' 库并保存到新的单独 df 中:

DataFrame df1 = dataFrame.withColumn("temp")

+----+-------+-----+
|  id|   name| temp|
+----+-------+-----+
|   1|      A|     |
+----+-------+-----+
|   2|      B|     |
+----+-------+-----+

从 UDF 返回一个列表(或地图):

+----+-------+------+
|  id|   name|  temp|
+----+-------+------+
|   1|      A| [C,D]|
+----+-------+------+
|   2|      B| [E,F]|
+----+-------+------+

在 temp 列上应用 explode() 并将其移动到新的数据框:

DataFrame unfolded = df1.select(functions.col("id"), functions.explode(new Column("temp")).as("name"))

+----+-------+
|  id|   name|
+----+-------+
|   1|      C|
+----+-------+
|   1|      D|
+----+-------+
|   2|      E|
+----+-------+
|   2|      F|
+----+-------+

现在,由于unfoldedbaseDf 的结构相同,我们可以应用unionAll,然后根据需要进行排序或过滤:

baseDf = baseDf.unionAll(unfolded).sort("id", "name"):

+----+-------+
|  id|   name|
+----+-------+
|   1|      A|
+----+-------+
|   1|      C|
+----+-------+
|   1|      D|
+----+-------+
|   2|      B|
+----+-------+
|   2|      E|
+----+-------+
|   2|      F|
+----+-------+

添加了新字段。

【讨论】:

以上是关于Apache Spark 在 DataFrame 中插入多行的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Apache Spark ML API 中从“DataFrame”创建一个“Vector”?

值 createGlobalTempView 不是 apache.org.spark.sql.DataFrame 的成员

Apache Spark 在 DataFrame 中插入多行

如何在 Apache Spark 中反转排列 DataFrame

类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD

Spark Rdd DataFrame操作汇总