如何在pyspark中按字母顺序对嵌套结构的列进行排序?
Posted
技术标签:
【中文标题】如何在pyspark中按字母顺序对嵌套结构的列进行排序?【英文标题】:How to sort columns of nested structs alphabetically in pyspark? 【发布时间】:2019-09-06 11:54:25 【问题描述】:我有以下架构的数据。我希望所有的列都应该按字母顺序排序。我想要它在 pyspark 数据框中。
root
|-- _id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- address: struct (nullable = true)
| |-- pin: integer (nullable = true)
| |-- city: string (nullable = true)
| |-- street: string (nullable = true)
以下代码仅对外部列进行排序,而不对嵌套列进行排序。
>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()
这段代码之后的架构看起来像
root
|-- _id: string (nullable = true)
|-- address: struct (nullable = true)
| |-- pin: integer (nullable = true)
| |-- city: string (nullable = true)
| |-- street: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
(因为id有下划线,所以先出现)
我想要的架构如下。 (连地址里面的列也要排序)
root
|-- _id: string (nullable = true)
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- pin: integer (nullable = true)
| |-- street: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
提前致谢。
【问题讨论】:
你能提供一个样本数据来检查吗? @samkart 我们这里只需要列名。我们不会对数据做任何事情,所以我认为它不需要样本数据。 【参考方案1】:这是一个适用于任意深度嵌套的StructType
s 的解决方案,它不依赖于任何列名的硬编码。
为了演示,我创建了以下稍微复杂的架构,其中address
列中有第二级嵌套。假设您的 DataFrame schema
如下:
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# | |-- pin: integer (nullable = true)
# | |-- city: string (nullable = true)
# | |-- zip: struct (nullable = true)
# | | |-- last4: integer (nullable = true)
# | | |-- first5: integer (nullable = true)
# | |-- street: string (nullable = true)
注意 address.zip
字段,其中包含 2 个乱序子字段。
您可以定义一个函数,该函数将递归地遍历您的 schema
并对字段进行排序以构建 Spark-SQL 选择表达式:
from pyspark.sql.types import StructType, StructField
def schemaToSelectExpr(schema, baseField=""):
select_cols = []
for structField in sorted(schema, key=lambda x: x.name):
if structField.dataType.typeName() == 'struct':
subFields = []
for fld in sorted(structField.jsonValue()['type']['fields'],
key=lambda x: x['name']):
newStruct = StructType([StructField.fromJson(fld)])
newBaseField = structField.name
if baseField:
newBaseField = baseField + "." + newBaseField
subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))
select_cols.append(
"struct(" + ",".join(subFields) + ") AS ".format(structField.name)
)
else:
if baseField:
select_cols.append(baseField + "." + structField.name)
else:
select_cols.append(structField.name)
return select_cols
在这个 DataFrame 的架构上运行它会产生(为了便于阅读,我将长“地址”字符串分成两行):
print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
# struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']
现在使用selectExpr
对列进行排序:
df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# | |-- city: string (nullable = true)
# | |-- pin: integer (nullable = true)
# | |-- street: string (nullable = true)
# | |-- zip: struct (nullable = false)
# | | |-- first5: integer (nullable = true)
# | | |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
【讨论】:
这似乎改变了struct
s 的nullable
属性——我会看看我能不能解决这个问题。【参考方案2】:
你可以先用漂亮的colname.*
结构类型合成器来扁平化你的DF。对于嵌套展平,您可以使用:How to flatten a struct in a Spark dataframe?。
然后,您可以在展平的数据框上创建一个新的 StructCol,其中输入 cols 已排序:
from pyspark.sql import functions as F
address_cols = df.select(F.col("address.*")).columns
df = df.withColumn(address, F.struct(*sorted([F.col(c) for c in address_cols])))
df2 = df[sorted(df.columns)]
【讨论】:
这是一个好的开始,但是您的代码由于多种原因而中断。 在这一行抛出错误 df = df.withColumn(address, F.struct(*sorted([F.col(c) for c in address_cols]))) 即使地址的引号也不是工作以上是关于如何在pyspark中按字母顺序对嵌套结构的列进行排序?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 swift 中按字母顺序对 JSON 字符串进行排序?