Pyspark:将数据帧作为 JSON 存储在 MySQL 表列中
Posted
技术标签:
【中文标题】Pyspark:将数据帧作为 JSON 存储在 MySQL 表列中【英文标题】:Pyspark: store dataframe as JSON in MySQL table column 【发布时间】:2021-05-20 09:40:54 【问题描述】:我有一个 spark 数据框,它需要以 JSON 格式存储在 mysql 表中作为列值。 (以及它们各自列中的其他字符串类型值)
类似的东西:
column 1 | column 2 |
---|---|
val 1 | ["name":"Peter G", "age":44, "city":"Quahog", "name":"John G", "age":30, "city":"Quahog", ..., ...] |
val 1 | ["name":"Stewie G", "age":3, "city":"Quahog", "name":"Ron G", "age":41, "city":"Quahog", ..., ...] |
... | ... |
这里["name":"Peter G", "age":44, "city":"Quahog", "name":"John G", "age":30, "city":"Quahog", ..., ...]
是一个数据帧存储为dict列表
我能做到:
str(dataframe_object.toJSON().collect())
然后将其存储到 mysql 表列中,但这意味着将整个数据加载到内存中,然后再将其存储到 mysql 表中。是否有更好/最佳的方法来实现这一点,即不使用collect()
?
【问题讨论】:
如果您使用的是 MySQL 5.7 或更高版本,则只需将其存储在类型为json
的列中
嗨@SuyogShimpi,是的,但是我如何在不使用df.toJSON().collect()
的情况下做到这一点,因为我没有 json 对象,而是 pyspark 数据框
你试过spark.write.jdbc
spark.apache.org/docs/3.0.1/api/python/…吗?
嗨@pltc,我需要将数据帧存储为 JSON。我可以使用spark.write.jdbc
直接存储数据帧,但是在写入mysql之前如何将数据帧转换为json?
hm,写之前为什么要转成JSON?有什么具体原因吗?并且通过转换您的意思是将整个数据帧转换为单个字符串变量?
【参考方案1】:
我想您可以将 StructType 列转换为 JSON 字符串,然后使用 spark.write.jdbc
写入 MySQL。只要your MySQL table has that column as JSON type,你就应该准备好了。
# My sample data
"c1": "val1",
"c2": [
"name": "N1", "age": 100 ,
"name": "N2", "age": 101
]
from pyspark.sql import functions as F
from pyspark.sql import types as T
schema = T.StructType([
T.StructField('c1', T.StringType()),
T.StructField('c2', T.ArrayType(T.StructType([
T.StructField('name', T.StringType()),
T.StructField('age', T.IntegerType())
])))
])
df = spark.read.json('a.json', schema=schema, multiLine=True)
df.show(10, False)
df.printSchema()
+----+----------------------+
|c1 |c2 |
+----+----------------------+
|val1|[N1, 100, N2, 101]|
+----+----------------------+
root
|-- c1: string (nullable = true)
|-- c2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: integer (nullable = true)
df.withColumn('j', F.to_json('c2')).show(10, False)
+----+----------------------+-------------------------------------------------+
|c1 |c2 |j |
+----+----------------------+-------------------------------------------------+
|val1|[N1, 100, N2, 101]|["name":"N1","age":100,"name":"N2","age":101]|
+----+----------------------+-------------------------------------------------+
编辑#1:
# My sample data
"c1": "val1",
"c2": "[ \"name\": \"N1\", \"age\": 100 , \"name\": \"N2\", \"age\": 101 ]"
from pyspark.sql import functions as F
from pyspark.sql import types as T
df = spark.read.json('a.json', multiLine=True)
df.show(10, False)
df.printSchema()
+----+-----------------------------------------------------------+
|c1 |c2 |
+----+-----------------------------------------------------------+
|val1|[ "name": "N1", "age": 100 , "name": "N2", "age": 101 ]|
+----+-----------------------------------------------------------+
root
|-- c1: string (nullable = true)
|-- c2: string (nullable = true)
schema = T.ArrayType(T.StructType([
T.StructField('name', T.StringType()),
T.StructField('age', T.IntegerType())
]))
df2 = df.withColumn('j', F.from_json('c2', schema))
df2.show(10, False)
df2.printSchema()
+----+-----------------------------------------------------------+----------------------+
|c1 |c2 |j |
+----+-----------------------------------------------------------+----------------------+
|val1|[ "name": "N1", "age": 100 , "name": "N2", "age": 101 ]|[N1, 100, N2, 101]|
+----+-----------------------------------------------------------+----------------------+
root
|-- c1: string (nullable = true)
|-- c2: string (nullable = true)
|-- j: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: integer (nullable = true)
【讨论】:
嗨@pltc,在你的解决方案中,c2 列已经有 json 格式的数据,但在我的场景中,我有一个包含 c1 和 c2 列的数据框,它们没有 json 数据,但是字符串输入值,我需要将该数据框中的所有行转换为字典列表。例如;['c1': 'row1_val1', 'c2': 'row1_val2' , 'c2': 'row2_val1', 'c2': 'row2_val2' , ...]
@ShubhamKadam,在这种情况下,它是类似的,您只需要为c2
定义 JSON 模式,然后使用 from_json
将 c2
转换为 JSON 对象。检查我编辑的答案以上是关于Pyspark:将数据帧作为 JSON 存储在 MySQL 表列中的主要内容,如果未能解决你的问题,请参考以下文章
使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问
如何将 json 转换为 pyspark 数据帧(更快的实现)[重复]