使用 pyspark 将结构数组旋转到列中 - 不爆炸数组

Posted

技术标签:

【中文标题】使用 pyspark 将结构数组旋转到列中 - 不爆炸数组【英文标题】:Pivot array of structs into columns using pyspark - not explode the array 【发布时间】:2020-05-29 04:07:18 【问题描述】:

我目前有一个带有 id 和列的数据框,该列是结构数组

 root
 |-- id: string (nullable = true)
 |-- lists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

这是一个包含数据的示例表:

 id | list1             | list2
 ------------------------------------------
 1  | [[a, av], [b, bv]]| [[e, ev], [f,fv]]
 2  | [[c, cv]]         | [[g,gv]]

如何将上面的数据框转换为下面的数据框?我需要“分解”数组并根据结构中的第一个值添加列。

 id | a   | b   | c   | d   | e  | f  | g  
 ----------------------------------------
 1  | av  | bv  | null| null| ev | fv | null
 2  | null| null| cv  | null|null|null|gv

创建数据框的pyspark代码如下:

d1 = spark.createDataFrame([("1", [("a","av"),("b","bv")], [("e", "ev"), ("f", "fv")]), \
                                    ("2", [("c", "cv")],  [("g", "gv")])], ["id","list1","list2"])

注意:我有 2.2.0 的 spark 版本,所以一些 sql 函数不起作用,例如 concat_map 等。

【问题讨论】:

可以发数组吗? @SachinthaNayanajith 我添加了 pyspark 代码 使用 inline 或 inline_outer 展开结构数组 @murtihash 嘿,你能提供使用 inline/inline_outer 的示例吗?无法弄清楚.. 【参考方案1】:

您可以使用 hogher order 函数来执行此操作,而无需像以下那样爆炸数组:

d1.select('id',
          f.when(f.size(f.expr('''filter(list1,x->x._1='a')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='a'),value->value._2)'''))).alias('a'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='b')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='b'),value->value._2)'''))).alias('b'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='c')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='c'),value->value._2)'''))).alias('c'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='d')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='d'),value->value._2)'''))).alias('d'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='e')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='e'),value->value._2)'''))).alias('e'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='f')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='f'),value->value._2)'''))).alias('f'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='g')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='g'),value->value._2)'''))).alias('g'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='h')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='h'),value->value._2)'''))).alias('h')\
          ).show()


+---+----+----+----+----+----+----+----+----+
| id|   a|   b|   c|   d|   e|   f|   g|   h|
+---+----+----+----+----+----+----+----+----+
|  1|  av|  bv|null|null|  ev|  fv|null|null|
|  2|null|null|  cv|null|null|null|  gv|null|
+---+----+----+----+----+----+----+----+----+

希望对你有帮助

【讨论】:

如果我没有收到以下任何错误,这个答案会很有帮助: raise ParseException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.ParseException: u"\nextraneous 输入 '>' 期待 '(', (第 1 行,位置 15)\n\n== SQL ==\nfilter(list1,x->x._1='a')\n-------- -------^^^\n" 我认为由于某种原因它无法执行 value -> value._2 它在 2.4.4 版本中运行良好【参考方案2】:

UPD - 适用于 Spark 2.2.0

您可以在 2.2.0 中使用 udfs 定义类似的函数。它们在性能方面的效率会低得多,并且您需要为每种输出值类型提供一个特殊函数(即,您将无法拥有一个可以从任何映射类型输出任何类型值的 element_at 函数) ,但他们会工作。以下代码适用于 Spark 2.2.0:

from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, ArrayType, StringType

@udf(MapType(StringType(), StringType()))
def map_from_entries(l):
    return x:y for x,y in l

@udf(MapType(StringType(), StringType()))
def map_concat(m1, m2):
    m1.update(m2)
    return m1

@udf(ArrayType(StringType()))
def map_keys(m):
    return list(m.keys())

def element_getter(k):
    @udf(StringType())
    def element_at(m):
        return m.get(k)
    return element_at

d2 = d1.select('id',
               map_concat(map_from_entries('list1'),
                          map_from_entries('list2')).alias('merged_map'))
map_keys = d2.select(f.explode(map_keys('merged_map')).alias('mk')) \
             .agg(f.collect_set('mk').alias('keys')) \
             .collect()[0].keys
map_keys = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
selects = [element_getter(k)('merged_map').alias(k) for k in sorted(map_keys)]
d = d2.select('id', *selects) 

原始答案(适用于 Spark 2.4.0+)

不清楚您的示例中 d 列的来源(d 从未出现在初始数据框中)。如果应该根据数组中的第一个元素创建列,那么这应该可以工作(假设列表中唯一第一个值的总数足够小):

import pyspark.sql.functions as f
d2 = d1.select('id',
               f.map_concat(f.map_from_entries('list1'),
                            f.map_from_entries('list2')).alias('merged_map'))
map_keys = d2.select(f.explode(f.map_keys('merged_map')).alias('mk')) \
             .agg(f.collect_set('mk').alias('keys')) \
             .collect()[0].keys
selects = [f.element_at('merged_map', k).alias(k) for k in sorted(map_keys)]
d = d2.select('id', *selects)

输出(d 没有列,因为它从未在初始 DataFrame 中提及):

+---+----+----+----+----+----+----+
| id|   a|   b|   c|   e|   f|   g|
+---+----+----+----+----+----+----+
|  1|  av|  bv|null|  ev|  fv|null|
|  2|null|null|  cv|null|null|  gv|
+---+----+----+----+----+----+----+

如果您确实想到列的列表从一开始就固定(并且它们不是从数组中获取的),那么您可以将变量 map_keys 的定义替换为固定的列列表,例如map_keys=['a', 'b', 'c', 'd', 'e', 'f', 'g']。在这种情况下,您会得到答案中提到的输出:

+---+----+----+----+----+----+----+----+
| id|   a|   b|   c|   d|   e|   f|   g|
+---+----+----+----+----+----+----+----+
|  1|  av|  bv|null|null|  ev|  fv|null|
|  2|null|null|  cv|null|null|null|  gv|
+---+----+----+----+----+----+----+----+

顺便说一句 - 你想做的不是 Spark 中所谓的explode。 Spark 中的explode 适用于从一个创建多行的情况。例如。如果你想从这样的数据框中获取:

+---+---------+
| id|      arr|
+---+---------+
|  1|   [a, b]|
|  2|[c, d, e]|
+---+---------+

到这里:

+---+-------+
| id|element|
+---+-------+
|  1|      a|
|  1|      b|
|  2|      c|
|  2|      d|
|  2|      e|
+---+-------+

【讨论】:

我喜欢你的解决方案,但我只能使用 spark 2.2,它没有你在代码中使用的 sql 函数。 我已经用一个应该在 2.2.0(使用 UDF)上工作的解决方案更新了我的答案。 感谢 Alexander 的快速转身。我将在更大的数据集上运行它并评估性能。

以上是关于使用 pyspark 将结构数组旋转到列中 - 不爆炸数组的主要内容,如果未能解决你的问题,请参考以下文章

将数据集参数添加到列中,以便稍后通过 DataPrep 在 BigQuery 中使用它们

使用 VBA 将用户定义的函数输入到列中

Sql Server如何使用date数据类型将日期插入到列中

Google BigQuery SQL:从 JSON(列表和数组)中提取数据到列中

PySpark Dataframe:将一个单词附加到列的每个值

怎么解决 ? (将列表添加到列数据框pyspark)