如何在 PySpark 中将字符串转换为字典 (JSON) 的 ArrayType

Posted

技术标签:

【中文标题】如何在 PySpark 中将字符串转换为字典 (JSON) 的 ArrayType【英文标题】:How to cast string to ArrayType of dictionary (JSON) in PySpark 【发布时间】:2018-08-06 18:40:42 【问题描述】:

尝试将 StringType 转换为 JSON 的 ArrayType 以生成 CSV 格式的数据帧。

Spark2 上使用pyspark

我正在处理的 CSV 文件;如下-

date,attribute2,count,attribute3
2017-09-03,'attribute1_value1',2,'["key":"value","key2":2,"key":"value","key2":2,"key":"value","key2":2]'
2017-09-04,'attribute1_value2',2,'["key":"value","key2":20,"key":"value","key2":25,"key":"value","key2":27]'

如上所示,它在文字字符串中包含一个属性"attribute3",从技术上讲,它是一个精确长度为2的字典(JSON)列表。 (这是函数 distinct 的输出)

来自printSchema()的片段

attribute3: string (nullable = true)

我正在尝试将"attribute3" 转换为ArrayType,如下所示

temp = dataframe.withColumn(
    "attribute3_modified",
    dataframe["attribute3"].cast(ArrayType())
)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: __init__() takes at least 2 arguments (1 given)

确实,ArrayType 期望数据类型作为参数。我试过"json",但没有用。

所需的输出 - 最后,我需要将attribute3 转换为ArrayType() 或简单的Python 列表。 (我试图避免使用eval

如何将其转换为ArrayType,以便将其视为 JSON 列表?

我在这里遗漏了什么吗?

(documentation,并没有直接解决这个问题)

【问题讨论】:

你想要的输出是什么?请阅读how to create good reproducible apache spark dataframe examples 并尝试为我们提供一些示例输入/输出。 @pault 更新了问题。我只是想将字符串转换为ArrayType(JSON?) JSON 不是 pyspark 中数组的有效数据类型。如果你能提供一个你希望最终输出看起来像什么的例子,那将会很有帮助。可能有不同的方法来获得尚未考虑的输出。 是的,JSON 不是有效的数据类型。我想将它转换为简单的 Python 列表,我可以在其中执行一些操作(例如 - 在 JSON 中求和/连接值,或将其与其他数据框连接并检查属性的相等性)我试图避免使用 @987654341 @ 【参考方案1】:

使用from_jsonattribute3 列中的实际数据匹配的架构将json 转换为ArrayType:

原始数据框:

df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: string (nullable = true)

from pyspark.sql.functions import from_json
from pyspark.sql.types import *

创建架构

schema = ArrayType(
    StructType([StructField("key", StringType()), 
                StructField("key2", IntegerType())]))

使用from_json:

df = df.withColumn("attribute3", from_json(df.attribute3, schema))

df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- key: string (nullable = true)
# |    |    |-- key2: integer (nullable = true)

df.show(1, False)
#+----------+----------+-----+------------------------------------+
#|date      |attribute2|count|attribute3                          |
#+----------+----------+-----+------------------------------------+
#|2017-09-03|attribute1|2    |[[value, 2], [value, 2], [value, 2]]|
#+----------+----------+-----+------------------------------------+

【讨论】:

可能是版本问题,但我通过此代码收到java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType @pault 同意。根据docs,在版本2.1.0 中,只允许StructTypeArrayType 被添加到2.2.0.【参考方案2】:

@Psidom 的 answer 对我不起作用,因为我使用的是 Spark 2.1。

就我而言,我不得不稍微修改您的 attribute3 字符串以将其包装在字典中:

import pyspark.sql.functions as f
df2 = df.withColumn("attribute3", f.concat(f.lit('"data": '), "attribute3", f.lit("")))
df2.select("attribute3").show(truncate=False)
#+--------------------------------------------------------------------------------------+
#|attribute3                                                                            |
#+--------------------------------------------------------------------------------------+
#|"data": ["key":"value","key2":2,"key":"value","key2":2,"key":"value","key2":2]|
#+--------------------------------------------------------------------------------------+

现在我可以如下定义架构:

schema = StructType(
    [
        StructField(
            "data",
            ArrayType(
                StructType(
                    [
                        StructField("key", StringType()),
                        StructField("key2", IntegerType())
                    ]
                )
            )
        )
    ]
)

现在使用from_json,后跟getItem()

df3 = df2.withColumn("attribute3", f.from_json("attribute3", schema).getItem("data"))
df3.show(truncate=False)
#+----------+----------+-----+---------------------------------+
#|date      |attribute2|count|attribute3                       |
#+----------+----------+-----+---------------------------------+
#|2017-09-03|attribute1|2    |[[value,2], [value,2], [value,2]]|
#+----------+----------+-----+---------------------------------+

还有架构:

df3.printSchema()
# root
# |-- attribute3: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- key: string (nullable = true)
# |    |    |-- key2: integer (nullable = true)

【讨论】:

这对我来说非常有用。巧妙地使用包装技巧使其工作。我在 2.1 上遇到了同样的问题。只是为了补充您的答案,我能够使用 schema = spark.read.json(df2.rdd.map(lambda row: row.attribute3)).schema 动态地让 spark 确定架构

以上是关于如何在 PySpark 中将字符串转换为字典 (JSON) 的 ArrayType的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中将带有字符串json字符串的列转换为带有字典的列

如何在pyspark中将字符串列转换为ArrayType

如何在pyspark中将字符串值转换为arrayType

如何在 PySpark 中将 Vector 类型的列转换为数组/字符串类型?

如何在 PySpark 1.6 中将 DataFrame 列从字符串转换为浮点/双精度?

如何在pyspark中将JSON字符串转换为JSON对象