如何在pyspark中按字母顺序对嵌套结构的列进行排序?

Posted

技术标签:

【中文标题】如何在pyspark中按字母顺序对嵌套结构的列进行排序?【英文标题】:How to sort columns of nested structs alphabetically in pyspark? 【发布时间】:2019-09-06 11:54:25 【问题描述】:

我有以下架构的数据。我希望所有的列都应该按字母顺序排序。我想要它在 pyspark 数据框中。

root
 |-- _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)

以下代码仅对外部列进行排序,而不对嵌套列进行排序。

>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()

这段代码之后的架构看起来像

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

(因为id有下划线,所以先出现)

我想要的架构如下。 (连地址里面的列也要排序)

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

提前致谢。

【问题讨论】:

你能提供一个样本数据来检查吗? @samkart 我们这里只需要列名。我们不会对数据做任何事情,所以我认为它不需要样本数据。 【参考方案1】:

这是一个适用于任意深度嵌套的StructTypes 的解决方案,它不依赖于任何列名的硬编码。

为了演示,我创建了以下稍微复杂的架构,其中address 列中有第二级嵌套。假设您的 DataFrame schema 如下:

df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- city: string (nullable = true)
# |    |-- zip: struct (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |    |    |-- first5: integer (nullable = true)
# |    |-- street: string (nullable = true)

注意 address.zip 字段,其中包含 2 个乱序子字段。

您可以定义一个函数,该函数将递归地遍历您的 schema 并对字段进行排序以构建 Spark-SQL 选择表达式:

from pyspark.sql.types import StructType, StructField

def schemaToSelectExpr(schema, baseField=""):
    select_cols = []
    for structField in sorted(schema, key=lambda x: x.name):
        if structField.dataType.typeName() == 'struct':

            subFields = []
            for fld in sorted(structField.jsonValue()['type']['fields'], 
                              key=lambda x: x['name']):
                newStruct = StructType([StructField.fromJson(fld)])
                newBaseField = structField.name
                if baseField:
                    newBaseField = baseField + "." + newBaseField
                subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))

            select_cols.append(
                "struct(" + ",".join(subFields) + ") AS ".format(structField.name)
            )
        else:
            if baseField:
                select_cols.append(baseField + "." + structField.name)
            else:
                select_cols.append(structField.name)
    return select_cols

在这个 DataFrame 的架构上运行它会产生(为了便于阅读,我将长“地址”字符串分成两行):

print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
#        struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']

现在使用selectExpr 对列进行排序:

df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# |    |-- city: string (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- street: string (nullable = true)
# |    |-- zip: struct (nullable = false)
# |    |    |-- first5: integer (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)

【讨论】:

这似乎改变了structs 的nullable 属性——我会看看我能不能解决这个问题。【参考方案2】:

你可以先用漂亮的colname.* 结构类型合成器来扁平化你的DF。对于嵌套展平,您可以使用:How to flatten a struct in a Spark dataframe?。

然后,您可以在展平的数据框上创建一个新的 StructCol,其中输入 cols 已排序:

from pyspark.sql import functions as F
address_cols = df.select(F.col("address.*")).columns
df = df.withColumn(address, F.struct(*sorted([F.col(c) for c in address_cols])))
df2 = df[sorted(df.columns)]

【讨论】:

这是一个好的开始,但是您的代码由于多种原因而中断。 在这一行抛出错误 df = df.withColumn(address, F.struct(*sorted([F.col(c) for c in address_cols]))) 即使地址的引号也不是工作

以上是关于如何在pyspark中按字母顺序对嵌套结构的列进行排序?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 swift 中按字母顺序对 JSON 字符串进行排序?

如何在数据库表中按字母顺序对列进行排序? [关闭]

如何在 iOS 中按字母顺序对 MediaQuery 的 ArtistQuery 数组进行排序

如何在C中按顺序对带有数字和字母的文件名进行排序?

在结构中按频率对数组中的字母进行排序

在树状通用列表中按字母数字顺序对子项进行排序的扩展方法