如何将 PySpark Dataframe 列的类型指定为 JSON

Posted

技术标签:

【中文标题】如何将 PySpark Dataframe 列的类型指定为 JSON【英文标题】:How to specify the type of PySpark Dataframe column as JSON 【发布时间】:2021-12-09 12:32:35 【问题描述】:

以下是我们的 pyspark 应用程序代码 sn-p。

schema = StructType(
    [
        StructField('name', StringType(), True),
        StructField('version', StringType(), True),
        StructField('requestBody', StringType(), True),
        StructField('id', StringType(), True),
    ]
)

df_new = df.withColumn('value', from_json('value', schema)) \
    .where(col('value.version') == '1') \
    .select(col('value.*'))\
    .na.drop() \
    .withColumn('requestBody', decrypt_udf(col('requestBody')))

df_new.show()

+-------+--------+---------------------------------------------+---+
|   name| version|                                  requestBody| id|
+-------+--------+---------------------------------------------+---+
|kj-test|       1|"data": "score": 130, "group": "silver"  |  1|
|kj-test|       1|"data": "score": 250, "group": "gold"    |  2|
|kj-test|       1|"data": "score": 330, "group": "platinum"|  3|
+-------+--------+---------------------------------------------+---+

decrypt_udf UDF 函数 sn-p:

@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
   ...
   ...
   return decrypted_json_str

当我将 spark 数据帧写入 S3 存储桶时,如下所示

df_new.write.mode('overwrite').json(path=s3outputpath)

生成的文件内容如下,这里requestBody 的值写为String,因此用双引号和转义内部双引号。

"name":"kj-test","version":"1","requestBody":"\"data\": \"score\": 130, \"group\": \"silver\"","id":"1"
"name":"kj-test","version":"2","requestBody":"\"data\": \"score\": 250, \"group\": \"gold\"","id":"1"
"name":"kj-test","version":"3","requestBody":"\"data\": \"score\": 330, \"group\": \"platinum\"","id":"1"

但是,我希望 requestBody 的值可以写成如下的 json。

"name":"kj-test","version":"1","requestBody":"data": "score": 130, "group": "silver","id":"1"

我知道我已将 requestBody 的类型指定为架构 StructField('requestBody', StringType(), True) 中的字符串,因此我以这种方式看到输出。我怎样才能达到我期望的输出?没有JsonType这样的类型


编辑:

请注意,我的 requestBody 架构不会总是这样 "data": "score": 130, "group": "silver"。对于给定的运行,它是固定的,但另一次运行可能具有完全不同的架构。

本质上,需要一种从 json 字符串推断模式的方法。找到一些可能有用的 SO 帖子,将尝试这些:

https://***.com/a/45880574/948268 Spark from_json with dynamic schema

【问题讨论】:

您是否尝试过更改您的 UDF 并返回为 MapType 而不是 StringType @pltc MapType 并未涵盖 json 的所有可能性。就像我无法在 MapType 中为 value 指定相应的类型。 【参考方案1】:

试试下面的代码。 (我没有测试过)

使用from_json 函数将requestBody json 字符串转换为结构体。

schema = StructType(
    [
        StructField('name', StringType(), True),
        StructField('version', StringType(), True),
        StructField('requestBody', StringType(), True),
        StructField('id', StringType(), True),
    ]
)

requestBody准备架构

requestSchema=StructType(
    [
        StructField('data', StructType([StructField('group',StringType(),True),StructField('score',LongType(),True)])),
    ]
)
df_new = df.withColumn('value', from_json('value', schema)) \
    .where(col('value.version') == '1') \
    .select(col('value.*'))\
    .withColumn()
    .na.drop() \
    .withColumn('requestBody', from_json('requestBody',requestSchema))
df_new.write.mode('overwrite').json(path=s3outputpath)

【讨论】:

感谢@Srinivas,我的 requestBody 架构不固定。它可以是任何 json。有没有办法根据 requestBody 值推断架构?【参考方案2】:

在您的 udf 中,添加以下将 python 对象转换为 JSON 字符串的方法:

import json   
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
   ...
   ...
   return json.dumps(decrypted_json_str)

【讨论】:

谢谢绿色。问题不在我的 udf 中,正如您在 df_new.show() 的输出中看到的那样,它显示正确。事实上,我已经在我的 udf 中做了json.dumps。但是当它写成requestBody字段的类型是StringType时,我遇到了这个问题。【参考方案3】:

较新的解决方案(我认为这是一个更好的解决方案)

我们最终使用的另一个聪明的解决方案。在此,我们定义了一个 udf get_combined_json,它结合了给定 Row 的所有列,然后返回一个 json 字符串。导致我们的最终数据框只有一列,这样我们就可以将数据框写为文本文件,这样整个 json 字符串就可以按原样写入而没有任何转义。以下是代码sn-p:

df_new = df.withColumn('value', from_json('value', schema)) \
    .where(col('value.version') == '1') \
    .select(col('value.*'))\
    .na.drop() \
    .withColumn('requestBody', decrypt_udf(col('requestBody')))

df_new.withColumn('combinedColumns', get_combined_json(struct([df_new[x] for x in df_new.columns]))) \
    .select(col('combinedColumns'))\
    .write.mode('overwrite').text(path=output_s3_bucket_path)

...

@udf(returnType=StringType())
def get_combined_json(row: Row):
    return json.dumps("requestBody": json.loads(row.requestBody),
                       "name": row.name,
                       "version": row.version,
                       "id": row.id)




较旧的解决方案

以下是我们从requestBody json 字符串派生/推断架构的方式:

request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema

然后使用架构更新数据框。这是有效的最终代码:

df_new = df.withColumn('value', from_json('value', schema)) \
    .where(col('value.version') == '1') \
    .select(col('value.*'))\
    .na.drop() \
    .withColumn('requestBody', decrypt_udf(col('requestBody')))

request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema

df_new = df_new.withColumn('requestBody', from_json(col('requestBody'), request_body_schema))

df_new.write.mode('overwrite').json(path=output_s3_bucket_path)

以下是写入 S3 存储桶的输出格式:

"name":"kj-test","version":"1","requestBody":"data": "score": 130, "group": "silver","id":"1"

【讨论】:

以上是关于如何将 PySpark Dataframe 列的类型指定为 JSON的主要内容,如果未能解决你的问题,请参考以下文章

PySpark Dataframe:将一个单词附加到列的每个值

如何遍历大型 Pyspark Dataframe 中列的不同值? .distinct().collect() 引发大任务警告

如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?

Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”

PySpark:转换DataFrame中给定列的值

计算 PySpark DataFrame 列的模式?