将 Pyspark Dataframe 列从数组转换为新列
Posted
技术标签:
【中文标题】将 Pyspark Dataframe 列从数组转换为新列【英文标题】:Convert Pyspark Dataframe column from array to new columns 【发布时间】:2017-12-18 18:04:23 【问题描述】:我有一个具有这种结构的 Pyspark 数据框:
root
|-- Id: string (nullable = true)
|-- Q: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- pr: string (nullable = true)
| | |-- qt: double (nullable = true)
类似于:
+----+--------------------- ... --+
| Id | Q |
+----+---------------------- ... -+
| 001| [ [pr1,1.9], [pr3,2.0]...] |
| 002| [ [pr2,1.0], [pr9,3.9]...] |
| 003| [ [pr2,9.0], ... ] |
...
我希望将 Q 数组转换为列(名称 pr 值 qt)。 另外我想通过合并(添加)相同的列来避免重复的列。
+----+-----+-----+------+ ... ----+
| Id | pr1 | pr2 | pr3 | ... prn |
+----+-----+-----+------+ ... ----+
| 001| 1.9 | 0.0 | 2.0 | ... |
| 002| 0.0 | 1.0 | 0 | ... |
| 003| 0.0 | 9.0 | ... | ... |
...
我怎样才能执行这种转换? 提前谢谢你!! 朱利安。
【问题讨论】:
嗨,如果答案有效或者您还有其他问题,请告诉我,谢谢 是的 ags29,谢谢!!! 【参考方案1】:您可以结合使用explode
和pivot
:
import pyspark.sql.functions as F
# explode to get "long" format
df=df.withColumn('exploded', F.explode('Q'))
# get the name and the name in separate columns
df=df.withColumn('name', F.col('exploded').getItem(0))
df=df.withColumn('value', F.col('exploded').getItem(1))
# now pivot
df.groupby('Id').pivot('name').agg(F.max('value')).na.fill(0)
【讨论】:
【参考方案2】:非常有趣的问题。我就是这样处理的。
test.csv
001,pr1:0.9,pr3:1.2,pr2:2.0
002,pr3:5.2,pr4:0.99
派斯帕克
file = sc.textFile("file:///test2.csv")
//get it in (key,value)
//[(u'001', u'pr1:0.9')...]
//rdd1 = file.map(lambda r: r.replace(",","\t",1)).map(lambda r: r.split("\t")).map(lambda r: (r[0],r[1])).flatMapValues(lambda r: r.split(','))
rdd1 = file.map(lambda r: r.split(",")[0]).map(lambda r: (r[0],r[1])).flatMapValues(lambda r: r.split(','))
//create a DF with 3 columns
//[(u'001', u'pr1', u'0.9')...)]
+---+---+----+
| _1| _2| _3|
+---+---+----+
|001|pr1| 0.9|
|001|pr3| 1.2|
|001|pr2| 2.0|
|002|pr3| 5.2|
|002|pr4|0.99|
+---+---+----+
rdd2 = rdd1.map(lambda r: (r[0],r[1].split(":"))).map(lambda r: (r[0],r[1][0],r[1][1]))
df = rdd2.toDF()
//Perform the magic
df.groupBy("_1").pivot("_2").agg(expr("coalesce(first(_3),0)"))
+---+---+---+---+----+
| _1|pr1|pr2|pr3| pr4|
+---+---+---+---+----+
|001|0.9|2.0|1.2| 0|
|002| 0| 0|5.2|0.99|
+---+---+---+---+----+
【讨论】:
谢谢巴拉,这是一个很好的解决方案。可能比 ags29 建议的时间长一点。以上是关于将 Pyspark Dataframe 列从数组转换为新列的主要内容,如果未能解决你的问题,请参考以下文章
将 numpy 数组转换为 pyspark 中的 DataFrame 以导出为 csv
PySpark DataFrame在使用explode之前将字符串的列更改为数组
使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]