如何为现有 DataFrame 创建新行?在 PySpark 或 Scala 中
Posted
技术标签:
【中文标题】如何为现有 DataFrame 创建新行?在 PySpark 或 Scala 中【英文标题】:How can I create new rows to the existing DataFrame? in PySpark or Scala 【发布时间】:2021-05-25 03:04:33 【问题描述】:例如,现在我有这个 DataFrame。
+--------+------+
| id|number|
+--------+------+
|19891201| 1|
|19891201| 4|
+--------+------+
但我希望这个 DataFrame 是这样的。
+--------+------+
| id|number|
+--------+------+
|19891201| 1|
|19891201| 2|
|19891201| 3|
|19891201| 4|
+--------+------+
我想创建新行,其数字范围为“数字”列中的 min() 和 max() 值。
在此示例中,我希望在“数字”列中的值为 2 和 3 的行。
【问题讨论】:
火花版本? 【参考方案1】:使用来自 spark 2.4+
版本的 sequence(start, stop, step)
函数。
scala> df
.groupBy($"id")
.agg(
min($"number").as("start"),
max($"number").as("end")
)
.selectExpr(
"id",
"explode_outer(sequence(start,end,1)) as number"
)
.show(false)
输出
+--------+------+
|id |number|
+--------+------+
|19891201|1 |
|19891201|2 |
|19891201|3 |
|19891201|4 |
+--------+------+
【讨论】:
【参考方案2】:试试这个代码
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import ArrayType, FloatType, StringType, IntegerType
from pyspark.sql.functions import min, max , udf, explode
schema = StructType([StructField("id", IntegerType(), True),StructField("number", IntegerType(), True)])
my_list = [(19891201, 1), (19891201,4)]
rdd = sc.parallelize(my_list)
df = sqlContext.createDataFrame(rdd, schema)
df.show()
df2 = df.groupby("id").agg(min("number").alias("min"),max("number").alias("max"))
def my_udf(min, max):
return list(range(min,max+1))
label_udf = udf(my_udf, ArrayType(IntegerType()))
df3 = df2.withColumn("l", label_udf(df2.min, df2.max)
df4 = df3.withColumn("ll", explode("l"))
df5 = df4.select("id", "ll")
df5.show()
【讨论】:
以上是关于如何为现有 DataFrame 创建新行?在 PySpark 或 Scala 中的主要内容,如果未能解决你的问题,请参考以下文章
如何为在输入时执行搜索的现有代码创建搜索按钮? php/javascript/mysql