使用 pyspark 将结构数组旋转到列中 - 不爆炸数组
Posted
技术标签:
【中文标题】使用 pyspark 将结构数组旋转到列中 - 不爆炸数组【英文标题】:Pivot array of structs into columns using pyspark - not explode the array 【发布时间】:2020-05-29 04:07:18 【问题描述】:我目前有一个带有 id 和列的数据框,该列是结构数组:
root
|-- id: string (nullable = true)
|-- lists: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: string (nullable = true)
| | |-- _2: string (nullable = true)
这是一个包含数据的示例表:
id | list1 | list2
------------------------------------------
1 | [[a, av], [b, bv]]| [[e, ev], [f,fv]]
2 | [[c, cv]] | [[g,gv]]
如何将上面的数据框转换为下面的数据框?我需要“分解”数组并根据结构中的第一个值添加列。
id | a | b | c | d | e | f | g
----------------------------------------
1 | av | bv | null| null| ev | fv | null
2 | null| null| cv | null|null|null|gv
创建数据框的pyspark代码如下:
d1 = spark.createDataFrame([("1", [("a","av"),("b","bv")], [("e", "ev"), ("f", "fv")]), \
("2", [("c", "cv")], [("g", "gv")])], ["id","list1","list2"])
注意:我有 2.2.0 的 spark 版本,所以一些 sql 函数不起作用,例如 concat_map 等。
【问题讨论】:
可以发数组吗? @SachinthaNayanajith 我添加了 pyspark 代码 使用 inline 或 inline_outer 展开结构数组 @murtihash 嘿,你能提供使用 inline/inline_outer 的示例吗?无法弄清楚.. 【参考方案1】:您可以使用 hogher order 函数来执行此操作,而无需像以下那样爆炸数组:
d1.select('id',
f.when(f.size(f.expr('''filter(list1,x->x._1='a')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='a'),value->value._2)'''))).alias('a'),\
f.when(f.size(f.expr('''filter(list1,x->x._1='b')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='b'),value->value._2)'''))).alias('b'),\
f.when(f.size(f.expr('''filter(list1,x->x._1='c')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='c'),value->value._2)'''))).alias('c'),\
f.when(f.size(f.expr('''filter(list1,x->x._1='d')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='d'),value->value._2)'''))).alias('d'),\
f.when(f.size(f.expr('''filter(list2,x->x._1='e')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='e'),value->value._2)'''))).alias('e'),\
f.when(f.size(f.expr('''filter(list2,x->x._1='f')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='f'),value->value._2)'''))).alias('f'),\
f.when(f.size(f.expr('''filter(list2,x->x._1='g')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='g'),value->value._2)'''))).alias('g'),\
f.when(f.size(f.expr('''filter(list2,x->x._1='h')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='h'),value->value._2)'''))).alias('h')\
).show()
+---+----+----+----+----+----+----+----+----+
| id| a| b| c| d| e| f| g| h|
+---+----+----+----+----+----+----+----+----+
| 1| av| bv|null|null| ev| fv|null|null|
| 2|null|null| cv|null|null|null| gv|null|
+---+----+----+----+----+----+----+----+----+
希望对你有帮助
【讨论】:
如果我没有收到以下任何错误,这个答案会很有帮助: raise ParseException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.ParseException: u"\nextraneous 输入 '>' 期待 '(', (第 1 行,位置 15)\n\n== SQL ==\nfilter(list1,x->x._1='a')\n-------- -------^^^\n" 我认为由于某种原因它无法执行 value -> value._2 它在 2.4.4 版本中运行良好【参考方案2】:UPD - 适用于 Spark 2.2.0
您可以在 2.2.0 中使用 udfs 定义类似的函数。它们在性能方面的效率会低得多,并且您需要为每种输出值类型提供一个特殊函数(即,您将无法拥有一个可以从任何映射类型输出任何类型值的 element_at
函数) ,但他们会工作。以下代码适用于 Spark 2.2.0:
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, ArrayType, StringType
@udf(MapType(StringType(), StringType()))
def map_from_entries(l):
return x:y for x,y in l
@udf(MapType(StringType(), StringType()))
def map_concat(m1, m2):
m1.update(m2)
return m1
@udf(ArrayType(StringType()))
def map_keys(m):
return list(m.keys())
def element_getter(k):
@udf(StringType())
def element_at(m):
return m.get(k)
return element_at
d2 = d1.select('id',
map_concat(map_from_entries('list1'),
map_from_entries('list2')).alias('merged_map'))
map_keys = d2.select(f.explode(map_keys('merged_map')).alias('mk')) \
.agg(f.collect_set('mk').alias('keys')) \
.collect()[0].keys
map_keys = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
selects = [element_getter(k)('merged_map').alias(k) for k in sorted(map_keys)]
d = d2.select('id', *selects)
原始答案(适用于 Spark 2.4.0+)
不清楚您的示例中 d
列的来源(d
从未出现在初始数据框中)。如果应该根据数组中的第一个元素创建列,那么这应该可以工作(假设列表中唯一第一个值的总数足够小):
import pyspark.sql.functions as f
d2 = d1.select('id',
f.map_concat(f.map_from_entries('list1'),
f.map_from_entries('list2')).alias('merged_map'))
map_keys = d2.select(f.explode(f.map_keys('merged_map')).alias('mk')) \
.agg(f.collect_set('mk').alias('keys')) \
.collect()[0].keys
selects = [f.element_at('merged_map', k).alias(k) for k in sorted(map_keys)]
d = d2.select('id', *selects)
输出(d
没有列,因为它从未在初始 DataFrame 中提及):
+---+----+----+----+----+----+----+
| id| a| b| c| e| f| g|
+---+----+----+----+----+----+----+
| 1| av| bv|null| ev| fv|null|
| 2|null|null| cv|null|null| gv|
+---+----+----+----+----+----+----+
如果您确实想到列的列表从一开始就固定(并且它们不是从数组中获取的),那么您可以将变量 map_keys
的定义替换为固定的列列表,例如map_keys=['a', 'b', 'c', 'd', 'e', 'f', 'g']
。在这种情况下,您会得到答案中提到的输出:
+---+----+----+----+----+----+----+----+
| id| a| b| c| d| e| f| g|
+---+----+----+----+----+----+----+----+
| 1| av| bv|null|null| ev| fv|null|
| 2|null|null| cv|null|null|null| gv|
+---+----+----+----+----+----+----+----+
顺便说一句 - 你想做的不是 Spark 中所谓的explode
。 Spark 中的explode
适用于从一个创建多行的情况。例如。如果你想从这样的数据框中获取:
+---+---------+
| id| arr|
+---+---------+
| 1| [a, b]|
| 2|[c, d, e]|
+---+---------+
到这里:
+---+-------+
| id|element|
+---+-------+
| 1| a|
| 1| b|
| 2| c|
| 2| d|
| 2| e|
+---+-------+
【讨论】:
我喜欢你的解决方案,但我只能使用 spark 2.2,它没有你在代码中使用的 sql 函数。 我已经用一个应该在 2.2.0(使用 UDF)上工作的解决方案更新了我的答案。 感谢 Alexander 的快速转身。我将在更大的数据集上运行它并评估性能。以上是关于使用 pyspark 将结构数组旋转到列中 - 不爆炸数组的主要内容,如果未能解决你的问题,请参考以下文章
将数据集参数添加到列中,以便稍后通过 DataPrep 在 BigQuery 中使用它们
Sql Server如何使用date数据类型将日期插入到列中
Google BigQuery SQL:从 JSON(列表和数组)中提取数据到列中