Pyspark 将列表列转换为嵌套结构列
Posted
技术标签:
【中文标题】Pyspark 将列表列转换为嵌套结构列【英文标题】:Pyspark Convert Column of Lists to Nested Structure Column 【发布时间】:2018-10-20 13:36:30 【问题描述】:我正在尝试将一组丑陋的文本字符串转换为具有代表性的 PySpark 数据框。我被困在将包含字符串列表的列转换为包含行的嵌套结构的列的最后一步。对于列表中的每个字符串,我使用 python 字典理解将其标准化为相同的字段。当我尝试通过列上的udf
进行转换时,它失败了。
我的“记录”列包含类似这样的字符串列表...
['field1, field2, field3, field4', 'field1, field2, field3, field4'..]
幸运的是,字符串结构定义明确,包含字符串和整数,所以我有一个 Python 字典理解,它只是拆分和分配名称。
def extract_fields(row: str) -> dict:
fields = row.split(",")
return 'field1': fields[0], 'field2': fields[1], ...
这在单个字符串上可以很好地转换为行
from pyspark.sql import Row
Row(**extract_fields( sample_string))
所以,我想我可以使用 UDF 将列转换为嵌套结构的列。
nest = sqlfn.udf(lambda x: [Row(**extract_fields(row)) for row in x])
通常我会添加为 UDF 返回的类型,但我不知道如何指示行数组。在我稍后执行之前,我不会收到错误。
所以,现在当我尝试将它应用到我的数据框时,
test = df.select(nest(df.records).alias('expanded')
test.show(5)
我收到此错误:
expected zero arguments for construction of ClassDict (for
pyspark.sql.types._create_row)
我发现的与此错误相关的其他问题似乎表明他们的字典中有类型错误,但就我而言,我的字典是字符串和整数类型。我还尝试了一个只有单个字符串列表的小例子,并得到了相同的答案。
我的预期结果是“扩展”的新列是具有嵌套行结构的列,该列中的单个行如下所示:
Row(expanded = [Row(field1='x11', field2='x12',...), Row(field1='x21',
field2='x22',....) ] )
有什么建议吗?
【问题讨论】:
【参考方案1】:TL;DR pyspark.sql.Row
对象不能从 udf
返回。
已知形状:
如果架构定义良好并且您不会因此而array<struct<...>>
,您应该使用标准tuple
。在这种情况下,可以像这样实现一个基本的解析功能*:
from typing import List, Tuple
def extract_fields(row: str) -> Tuple[str]:
# Here we assume that each element has the expected number of fields
# In practice you should validate the data
return tuple(row.split(","))
并为udf
提供输出架构:
schema = ("array<struct<"
"field1: string, field2: string, field3: string, field4: string"
">>")
@sqlfn.udf(schema)
def extract_multile_fields(rows: List[str]) -> List[Tuple[str]]:
return [extract_fields(row) for row in rows]
result = df.select(extract_multile_fields("x"))
result.show(truncate=False)
+--------------------------------------------------------------------------+
|extract_multile_fields(x) |
+--------------------------------------------------------------------------+
|[[field1, field2, field3, field4], [field1, field2, field3, field4]]|
+--------------------------------------------------------------------------+
如果字段的数量很大,您可能更愿意以编程方式构建架构,而不是使用 DDL 字符串:
from pyspark.sql.types import ArrayType, StringType, StructField, StructType
schema = ArrayType(StructType(
[StructField(f"field$i", StringType()) for i in range(1, 5)]
))
在 Spark 2.4 或更高版本中,您也可以直接使用内置函数:
from pyspark.sql.column import Column
def extract_multile_fields_(col: str) -> Column:
return sqlfn.expr(f"""transform(
-- Here we parameterize input with
transform(`col`, s -> split(s, ',')),
-- Adjust the list of fields and cast if necessary
a -> struct(
a[0] as field1, a[1] as field2, a[2] as field3, a[3] as field4)
)""")
result = df.select(extract_multile_fields_("x").alias("records"))
result.show(truncate=False)
+--------------------------------------------------------------------------+
|records |
+--------------------------------------------------------------------------+
|[[field1, field2, field3, field4], [field1, field2, field3, field4]]|
+--------------------------------------------------------------------------+
未知形状:
如果数据的形状未知,那么array<struct<...>>
不是DataType
的正确选择。在这种情况下,您可以尝试使用array<map<..., ...>>
,但这需要所有值的类型相同:
from typing import Dict
def extract_fields(row: str) -> Dict[str, str]:
return ... # TODO: Implement the logic
@sqlfn.udf("array<map<string, string>>")
def extract_multile_fields(rows: List[str]) -> List[Dict[str, str]]:
return [extract_fields(row) for row in rows]
* 请注意,所有记录的形状必须相同。如果缺少某些字段。您应该使用None
来填补空白。
【讨论】:
以上是关于Pyspark 将列表列转换为嵌套结构列的主要内容,如果未能解决你的问题,请参考以下文章
通过使用pyspark将列转换为行来解析数据框中的Json字符串列表?