Apache Spark 在 DataFrame 中插入多行
Posted
技术标签:
【中文标题】Apache Spark 在 DataFrame 中插入多行【英文标题】:Apache Spark insert multiple rows into the DataFrame 【发布时间】:2021-01-12 13:34:51 【问题描述】:首先我绑定到Java 1.7
和Java Spark 1.6
我有很多列和数据,但让我们按照这个简单的例子。 所以假设我有一个简单的表(DataFrame)
+----+-------+
| id| name|
+----+-------+
| 1| A|
+----+-------+
| 2| B|
+----+-------+
| 3| C|
+----+-------+
每次在每个单元格上,我都会调用自定义 udf 函数来进行所需的计算。要求之一是每次在每一行之后(或在具有某种值的每一行之后)创建并附加新的 N 行。
所以,就像:
+----+-------+
| id| name|
+----+-------+
| 1| A| --> create 1 new Row (based on the udf calculations)
+----+-------+
| 2| B| --> create 2 new Rows (based on the udf calculations)
+----+-------+
| 3| C|
+----+-------+
预期结果是:
+----+-------+
| id| name|
+----+-------+
| 1| A|
+----+-------+
| | (new)|
+----+-------+
| 2| B|
+----+-------+
| | (new)|
+----+-------+
| | (new)|
+----+-------+
| 3| C|
+----+-------+
我的误解 - 最好/正确的方法是什么?
我当前面临的问题:通过dataFrame.foreach(new Function1<Row, BoxedUnit>() ...)
Serializable error.
就我个人而言,我不确定foreach
是不是最好的方法,但我必须以某种方式迭代当前的数据帧。
此外,如果我做对了,我将始终申请 unionAll
来追加新行。
也许还有其他更好的方法可以通过Spark Sql
或将其转换为RDD
等来做到这一点。
【问题讨论】:
我建议从 UDF 返回一个数组,然后将数组分解为多行 是的,在 UDF 中,我会将计算结果保存到 temp 列中,以便在迭代期间获取它。但是对于我来说,迭代本身仍然不清楚。谢谢。 不需要迭代。只需做类似df.select(col("id"),col("name"),explode(my_udf("id","name")))
【参考方案1】:
回答我自己的问题(感谢@mck 提供explode()
的想法)
所以,假设最初的 df 是:
DataFrame baseDf = ...
+----+-------+
| id| name|
+----+-------+
| 1| A|
+----+-------+
| 2| B|
+----+-------+
为 UDF 结果创建新的 'temp' 库并保存到新的单独 df 中:
DataFrame df1 = dataFrame.withColumn("temp")
+----+-------+-----+
| id| name| temp|
+----+-------+-----+
| 1| A| |
+----+-------+-----+
| 2| B| |
+----+-------+-----+
从 UDF 返回一个列表(或地图):
+----+-------+------+
| id| name| temp|
+----+-------+------+
| 1| A| [C,D]|
+----+-------+------+
| 2| B| [E,F]|
+----+-------+------+
在 temp 列上应用 explode()
并将其移动到新的数据框:
DataFrame unfolded = df1.select(functions.col("id"), functions.explode(new Column("temp")).as("name"))
+----+-------+
| id| name|
+----+-------+
| 1| C|
+----+-------+
| 1| D|
+----+-------+
| 2| E|
+----+-------+
| 2| F|
+----+-------+
现在,由于unfolded
和baseDf
的结构相同,我们可以应用unionAll
,然后根据需要进行排序或过滤:
baseDf = baseDf.unionAll(unfolded).sort("id", "name"):
+----+-------+
| id| name|
+----+-------+
| 1| A|
+----+-------+
| 1| C|
+----+-------+
| 1| D|
+----+-------+
| 2| B|
+----+-------+
| 2| E|
+----+-------+
| 2| F|
+----+-------+
添加了新字段。
【讨论】:
以上是关于Apache Spark 在 DataFrame 中插入多行的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Apache Spark ML API 中从“DataFrame”创建一个“Vector”?
值 createGlobalTempView 不是 apache.org.spark.sql.DataFrame 的成员
Apache Spark 在 DataFrame 中插入多行
如何在 Apache Spark 中反转排列 DataFrame
类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD