如何使用pyspark将json对象插入postgres表中的列

Posted

技术标签:

【中文标题】如何使用pyspark将json对象插入postgres表中的列【英文标题】:How to insert json object to a column in postgres table using pyspark 【发布时间】:2020-08-24 05:52:01 【问题描述】:

我有一个如下的 json 变量,我需要将它插入到 postgres 表中的特定列中。我该怎么办呢

JSON 变量是 man_j,格式如下

     "a_type":"Res", "display_type":"Res", "data_type":"AAA", 
     "source_name":"na", "li_details":"li_value":"na", "li_column":"na", 
       "additional_info": "d_name":"na",  
      "description":"na", "program":"program2",  
        "Author":"author2", "email":"na", "sum":"na", 
         "file_name":"na","additional_files":"na", "notify_email":"na"

我在 postgres 表中有一个名为 man_de 的列,我需要为其插入值

【问题讨论】:

你能粘贴一个包含所有变量的 json,以便告诉你如何提取你想要的东西@sanjana 上面提到的json需要插入到特定的列中,整个json存储在需要插入到该列中的变量中 如果不需要转换,那么最好直接加载 json,就像使用 INSERT 加载所有其他数据类型一样。在这里看不到 pyspark 的用途。 所以,创建的 json 变量在 pyspark 中,因此我的脚本应该在下一步将 json 自动加载到 postgres 表中的列中 将变量转换为数据框。使用 df.write method.API 文档将其直接写入表spark.apache.org/docs/latest/sql-data-sources-jdbc.html 以获取更多信息。 【参考方案1】:

Spark 不直接提供单列更新。使用暂存区(在 db 级别)并单独更新 json 列(这比您手头的实际问题有点复杂),您的解决方案可能会变得复杂。

由于您也拥有其他列的数据,因此将 JSON 变量转换为 DF。 将它与其他列的 DF 结合起来。现在您可以直接附加一些内容。

    DF1 --> 所有其他列 DF2 --> JSON 变量

加入 DF1 和 DF2(希望你有一些 PK 可以加入) 按照 RDBMS 表中的顺序重新排列列。 根据您的需要使用 df.write 写入/附加它

【讨论】:

【参考方案2】:

我不确定这是否是你要找的东西,假设你的连接配置了 PostGre,那么你可以简单地调用下面来执行任何 sql 操作

df = df.withColumn("new_json_column", json_variable) // I am assuming here you have the correct schema or else you can create null column to ensure there is no schema mismatch
//to send only 1 transaction
df = df.show(1)



df.write.mode("append").jdbc(jdbcUrl, "db.table_name", connectionProperties)

【讨论】:

它将是表中现有的列名 不是问题..你可以给相同的列名 我收到 AssertionError : col 应该是 Column

以上是关于如何使用pyspark将json对象插入postgres表中的列的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 PySpark 将 JSON 列类型写入 Postgres?

如何将 json 对象列表转换为单个 pyspark 数据框?

PySpark:将 SchemaRDD 映射到 SchemaRDD

如何在pyspark中将JSON字符串转换为JSON对象

使用 Pyspark 将每个 json 对象读取为 Dataframe 中的单行?

如何在 PySpark 的分组对象中插入一列?