在pyspark中展平Maptype列

Posted

技术标签:

【中文标题】在pyspark中展平Maptype列【英文标题】:Flattening Maptype column in pyspark 【发布时间】:2018-10-11 14:21:14 【问题描述】:

我有一个带有 MapType 列的 pyspark DataFrame,并希望通过键名将其分解为所有列

root
 |-- a: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)

我想做sp_test.select('a.*') 但出现错误:

AnalysisException: '只能用星号扩展结构数据类型。属性:ArrayBuffer(a);'

如果我们知道所有的键,这可以通过这样做来实现

sp_test.select(['a.%s'%item for item in ['a','b']]).show()

但我想删除关键依赖

如果我们有一个 StrucType 列,这可以通过 display(nested_df.select('*', 'nested_array.*')) 轻松实现

root
 |-- _corrupt_record: string (nullable = true)
 |-- field1: long (nullable = true)
 |-- field2: long (nullable = true)
 |-- nested_array: struct (nullable = true)
 |    |-- nested_field1: long (nullable = true)
 |    |-- nested_field2: long (nullable = true)

我有一些疑问:

    可以将MapType 转换为StructType吗? 我们可以直接从MapType查询子键吗?

【问题讨论】:

***.com/questions/48331272/… 这不一样但是,你可以尝试在这里寻找一些选项。 【参考方案1】:

TL;DR: 除非您提前知道可能的密钥,否则没有简单的方法可以完成您的要求。

让我用一个例子来解释为什么以及你的选择是什么。

首先,创建如下DataFrame:

data = [('a': 1, 'b': 2,), ('c':3,), ('a': 4, 'c': 5,)]
df = spark.createDataFrame(data, ["a"])
df.show()
#+-------------------+
#|                  a|
#+-------------------+
#|Map(a -> 1, b -> 2)|
#|        Map(c -> 3)|
#|Map(a -> 4, c -> 5)|
#+-------------------+

具有以下架构:

df.printSchema()
#root
# |-- a: map (nullable = true)
# |    |-- key: string
# |    |-- value: long (valueContainsNull = true)
    MapType可以转换成StructType吗?

简单的答案是否定的(至少效率不高),除非您提前知道密钥。

MapTypeStructType 之间的区别在于映射的键值对是逐行独立的。对于结构列中的StructType 列,情况并非如此,所有行都具有相同的结构字段。

因此,spark 无法轻松推断要从地图中创建哪些列。 (请记住,火花在每一行上并行运行)。另一方面,将结构分解成列很简单,因为所有列都是提前知道的。

因此,如果您知道密钥,则可以通过以下方式创建结构类型:

import pyspark.sql.functions as f

df_new = df.select(
    f.struct(*[f.col("a").getItem(c).alias(c) for c in ["a", "b", "c"]]).alias("a")
)
df_new.show()
#+-------------+
#|            a|
#+-------------+
#|   [1,2,null]|
#|[null,null,3]|
#|   [4,null,5]|
#+-------------+

而新的架构是:

df_new.printSchema()
#root
# |-- a: struct (nullable = false)
# |    |-- a: long (nullable = true)
# |    |-- b: long (nullable = true)
# |    |-- c: long (nullable = true)
    我们可以直接从 MapType 查询子键吗?

是的,(如上所示)您可以使用getItem() 从列表中的索引处获取项目,或者通过键从地图中获取项目。


如果您不知道密钥,您唯一的选择是将 explode 映射成行,groupbypivot

df.withColumn("id", f.monotonically_increasing_id())\
    .select("id", f.explode("a"))\
    .groupby("id")\
    .pivot("key")\
    .agg(f.first("value"))\
    .drop("id")\
    .show()
#+----+----+----+
#|   a|   b|   c|
#+----+----+----+
#|null|null|   3|
#|   1|   2|null|
#|   4|null|   5|
#+----+----+----+

在这种情况下,我们需要先创建一个id 列,以便进行分组。

这里的pivot 可能很昂贵,具体取决于您的数据大小。

【讨论】:

如果地图是嵌套的呢?

以上是关于在pyspark中展平Maptype列的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:从现有列创建 MapType 列

用 Pandas 或 Pyspark 用两列表示的关系展平“树”

Spark DataFrame ArrayType 或 MapType 用于检查列中的值

Spark Python Pyspark 如何使用字典数组和嵌入式字典来展平列(sparknlp 注释器输出)

带有点“。”的数据框的 pyspark 访问列

MapType 列值上的 PySpark 杠杆函数