Pyspark:将数据帧作为 JSON 存储在 MySQL 表列中

Posted

技术标签:

【中文标题】Pyspark:将数据帧作为 JSON 存储在 MySQL 表列中【英文标题】:Pyspark: store dataframe as JSON in MySQL table column 【发布时间】:2021-05-20 09:40:54 【问题描述】:

我有一个 spark 数据框,它需要以 JSON 格式存储在 mysql 表中作为列值。 (以及它们各自列中的其他字符串类型值)

类似的东西:

column 1 column 2
val 1 ["name":"Peter G", "age":44, "city":"Quahog", "name":"John G", "age":30, "city":"Quahog", ..., ...]
val 1 ["name":"Stewie G", "age":3, "city":"Quahog", "name":"Ron G", "age":41, "city":"Quahog", ..., ...]
... ...

这里["name":"Peter G", "age":44, "city":"Quahog", "name":"John G", "age":30, "city":"Quahog", ..., ...] 是一个数据帧存储为dict列表

的结果

我能做到:

str(dataframe_object.toJSON().collect())

然后将其存储到 mysql 表列中,但这意味着将整个数据加载到内存中,然后再将其存储到 mysql 表中。是否有更好/最佳的方法来实现这一点,即不使用collect()

【问题讨论】:

如果您使用的是 MySQL 5.7 或更高版本,则只需将其存储在类型为 json 的列中 嗨@SuyogShimpi,是的,但是我如何在不使用df.toJSON().collect() 的情况下做到这一点,因为我没有 json 对象,而是 pyspark 数据框 你试过spark.write.jdbcspark.apache.org/docs/3.0.1/api/python/…吗? 嗨@pltc,我需要将数据帧存储为 JSON。我可以使用spark.write.jdbc直接存储数据帧,但是在写入mysql之前如何将数据帧转换为json? hm,写之前为什么要转成JSON?有什么具体原因吗?并且通过转换您的意思是将整个数据帧转换为单个字符串变量? 【参考方案1】:

我想您可以将 StructType 列转换为 JSON 字符串,然后使用 spark.write.jdbc 写入 MySQL。只要your MySQL table has that column as JSON type,你就应该准备好了。

# My sample data

    "c1": "val1",
    "c2": [
         "name": "N1", "age": 100 ,
         "name": "N2", "age": 101 
    ]


from pyspark.sql import functions as F
from pyspark.sql import types as T

schema = T.StructType([
    T.StructField('c1', T.StringType()),
    T.StructField('c2', T.ArrayType(T.StructType([
        T.StructField('name', T.StringType()),
        T.StructField('age', T.IntegerType())
    ])))
])

df = spark.read.json('a.json', schema=schema, multiLine=True)
df.show(10, False)
df.printSchema()

+----+----------------------+
|c1  |c2                    |
+----+----------------------+
|val1|[N1, 100, N2, 101]|
+----+----------------------+

root
 |-- c1: string (nullable = true)
 |-- c2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: integer (nullable = true)

df.withColumn('j', F.to_json('c2')).show(10, False)
+----+----------------------+-------------------------------------------------+
|c1  |c2                    |j                                                |
+----+----------------------+-------------------------------------------------+
|val1|[N1, 100, N2, 101]|["name":"N1","age":100,"name":"N2","age":101]|
+----+----------------------+-------------------------------------------------+

编辑#1:

# My sample data

    "c1": "val1",
    "c2": "[ \"name\": \"N1\", \"age\": 100 , \"name\": \"N2\", \"age\": 101 ]"


from pyspark.sql import functions as F
from pyspark.sql import types as T

df = spark.read.json('a.json', multiLine=True)
df.show(10, False)
df.printSchema()

+----+-----------------------------------------------------------+
|c1  |c2                                                         |
+----+-----------------------------------------------------------+
|val1|[ "name": "N1", "age": 100 , "name": "N2", "age": 101 ]|
+----+-----------------------------------------------------------+

root
 |-- c1: string (nullable = true)
 |-- c2: string (nullable = true)

schema = T.ArrayType(T.StructType([
    T.StructField('name', T.StringType()),
    T.StructField('age', T.IntegerType())
]))

df2 = df.withColumn('j', F.from_json('c2', schema))
df2.show(10, False)
df2.printSchema()

+----+-----------------------------------------------------------+----------------------+
|c1  |c2                                                         |j                     |
+----+-----------------------------------------------------------+----------------------+
|val1|[ "name": "N1", "age": 100 , "name": "N2", "age": 101 ]|[N1, 100, N2, 101]|
+----+-----------------------------------------------------------+----------------------+

root
 |-- c1: string (nullable = true)
 |-- c2: string (nullable = true)
 |-- j: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: integer (nullable = true)

【讨论】:

嗨@pltc,在你的解决方案中,c2 列已经有 json 格式的数据,但在我的场景中,我有一个包含 c1 和 c2 列的数据框,它们没有 json 数据,但是字符串输入值,我需要将该数据框中的所有行转换为字典列表。例如; ['c1': 'row1_val1', 'c2': 'row1_val2' , 'c2': 'row2_val1', 'c2': 'row2_val2' , ...] @ShubhamKadam,在这种情况下,它是类似的,您只需要为c2 定义 JSON 模式,然后使用 from_jsonc2 转换为 JSON 对象。检查我编辑的答案

以上是关于Pyspark:将数据帧作为 JSON 存储在 MySQL 表列中的主要内容,如果未能解决你的问题,请参考以下文章

将 PySpark 数据帧转换为 JSON,每列作为键

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

Pyspark 将 json 数组转换为数据帧行

如何将 json 转换为 pyspark 数据帧(更快的实现)[重复]

使用 pyspark 将 spark 数据帧转换为嵌套 JSON

kafka 到 pyspark 结构化流,将 json 解析为数据帧