Pyspark 将嵌套结构字段转换为 Json 字符串

Posted

技术标签:

【中文标题】Pyspark 将嵌套结构字段转换为 Json 字符串【英文标题】:Pyspark Convert Nested Struct field to Json String 【发布时间】:2020-07-30 18:16:09 【问题描述】:

我正在尝试使用 pyspark 将一些 mongo 集合引入到大查询中。架构如下所示。

root
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- my_field: struct (nullable = true)
 |    |    |    |-- ** mongo id **: struct (nullable = true)
 |    |    |    |    |-- A: timestamp (nullable = true)
 |    |    |    |    |-- B: string (nullable = true)
 |    |    |    |    |-- C: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = true)
 |    |    |    |    |    |-- def: boolean (nullable = true)
 |    |    |    |    |    |-- ghi: boolean (nullable = true)
 |    |    |    |    |    |-- xyz: boolean (nullable = true)

问题是在 my_field 中我们存储了 id,每个组都有自己的 id,当我将所有内容导入到大查询中时,我最终会为每个 id 生成一个新列。我想将 my_field 转换为字符串并将所有嵌套字段存储为 json 或类似的东西。但是当我尝试转换它时,我收到了这个错误

temp_df = temp_df.withColumn("groups.my_field", col("groups.my_field").cast('string'))

TypeError: Column is not iterable

我错过了什么?

【问题讨论】:

你考虑过使用BQ的TO_JSON_STRING函数吗? 你能发布一个示例输入行吗 to_json 不起作用吗? 它不适用于嵌套字段。我可以用 to_json 创建一个新字段,但只能在根级别,我不能替换 my_field 甚至把它放在同一级别 【参考方案1】:

事实证明,为了附加/删除/重命名嵌套字段,您需要更改架构。我不知道。所以这是我的答案。我从这里https://***.com/a/48906217/984114 复制并修改了代码,以使其适用于我的架构

这是“exclude_nested_field”的修改版本

def change_nested_field_type(schema, fields_to_change, parent=""):
  new_schema = []

  if isinstance(schema, StringType):
      return schema

  for field in schema:
      full_field_name = field.name

      if parent:
          full_field_name = parent + "." + full_field_name

      if full_field_name not in fields_to_change:
          if isinstance(field.dataType, StructType):
              inner_schema = change_nested_field_type(field.dataType, fields_to_change, full_field_name)
              new_schema.append(StructField(field.name, inner_schema))
          elif isinstance(field.dataType, ArrayType):
              inner_schema = change_nested_field_type(field.dataType.elementType, fields_to_change, full_field_name)
              new_schema.append(StructField(field.name, ArrayType(inner_schema)))
          else:
              new_schema.append(StructField(field.name, field.dataType))
      else:
          # Here we change the field type to String
          new_schema.append(StructField(field.name, StringType()))

  return StructType(new_schema)

这是我调用函数的方式

new_schema = ArrayType(change_nested_field_type(df.schema["groups"].dataType.elementType, ["my_field"]))
df = df.withColumn("json", to_json("groups")).drop("groups")
df = df.withColumn("groups", from_json("json", new_schema)).drop("json")

【讨论】:

以上是关于Pyspark 将嵌套结构字段转换为 Json 字符串的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 将列表列转换为嵌套结构列

使用 pyspark 将 spark 数据帧转换为嵌套 JSON

尝试使用 JSON 结构将时间箱转换为 Pyspark 中的分钟和小时数组

如何将 JSON 格式的数据展平为 spark 数据框

如何在pyspark中将rdd行转换为带有json结构的数据框?

使用 UDF 从 PySpark Dataframe 解析嵌套的 XML 字段