pySpark - 在插入数据库之前将整个数据框列转换为 JSON 对象

Posted

技术标签:

【中文标题】pySpark - 在插入数据库之前将整个数据框列转换为 JSON 对象【英文标题】:pySpark - convert an entire dataframe column into JSON object before inserting into DB 【发布时间】:2021-12-18 21:33:13 【问题描述】:

在这一点上,我对 pyspark 的了解非常有限,因此我正在寻找一种快速解决方案来解决我在当前实现中遇到的这个问题。我正在尝试通过 pyspark 将 JSON 文件读取到数据帧中,将其转换为可以插入数据库表(DynamoDB)的对象。表中的列应代表 JSON 文件中指定的键。例如,如果我的 JSON 文件包含以下元素:


   "Records":[
      
         "column1":"Value1",
         "column2":"Value2",
         "column3":"Value3",
         "column4":
            "sub1":"Value4",
            "sub2":"Value5",
            "sub3":
               "sub4":"Value6",
               "sub5":"Value7"
            
         
      ,
      
         "column1":"Value8",
         "column2":"Value9",
         "column3":"Value10",
         "column4":
            "sub1":"Value11",
            "sub2":"Value12",
            "sub3":
               "sub4":"Value13",
               "sub5":"Value14"
            
         
      
   ]

数据库表中的列分别为column1、column2、column3和column4。对于 Map 类型的 column4,我需要将整个对象转换为字符串,然后再将其插入数据库。因此,在第一行的情况下,我可以期望在该列中看到:


   "sub1":"Value4",
   "sub2":"Value5",
   "sub3":
      "sub4":"Value6",
      "sub5":"Value7"
   

但是,这是我在运行脚本后在数据库表中看到的内容:

 Value4, Value5,  Value6, Value7 

我知道这是因为在执行 DB 插入操作之前将所有列值转换为 String 类型之前需要做一些事情:

for col in Rows.columns:
    Rows = Rows.withColumn(col, Rows[col].cast(StringType()))

我正在寻找一种方法来纠正 Column4 的内容以表示原始 JSON 对象,然后再将它们转换为 String 类型。这是我到目前为止写的(不包括数据库插入操作)

import pyspark.sql.types as T
from pyspark.sql import functions as SF

df = spark.read.option("multiline", "true").json('/home/abhishek.tirkey/Documents/test')

Records = df.withColumn("Records", SF.explode(SF.col("Records")))

Rows = Records.select(
    "Records.column1",
    "Records.column2",
    "Records.column3",
    "Records.column4",
)

for col in Rows.columns:
    Rows = Rows.withColumn(col, Rows[col].cast(StringType()))

RowsJSON = Rows.toJSON()

【问题讨论】:

您能否根据您的示例输入添加表格中的输出内容,包括其列及其内容。 你找到to_json_object函数了吗?这将返回一个字符串... 【参考方案1】:

有一个to_json 函数可以做到这一点:

from pyspark.sql import functions as F

df = df.withColumn("record", F.explode("records")).select(
    "record.column1",
    "record.column2",
    "record.column3",
    F.to_json("record.column4").alias("column4"),
)

df.show()
+-------+-------+-------+--------------------+                                  
|column1|column2|column3|             column4|
+-------+-------+-------+--------------------+
| Value1| Value2| Value3|"sub1":"Value4",...|
| Value8| Value9|Value10|"sub1":"Value11"...|
+-------+-------+-------+--------------------+

df.printSchema()
root
 |-- column1: string (nullable = true)
 |-- column2: string (nullable = true)
 |-- column3: string (nullable = true)
 |-- column4: string (nullable = true)

【讨论】:

以上是关于pySpark - 在插入数据库之前将整个数据框列转换为 JSON 对象的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加

如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较

将列表转换为pyspark中的数据框列

在 Python/PySpark 中 Spark 复制数据框列的最佳实践?

Pyspark/SQL 将具有列表值的列连接到另一个数据框列

在 Pyspark 中列出保存顺序的数据框列