如何使用pyspark将json对象插入postgres表中的列
Posted
技术标签:
【中文标题】如何使用pyspark将json对象插入postgres表中的列【英文标题】:How to insert json object to a column in postgres table using pyspark 【发布时间】:2020-08-24 05:52:01 【问题描述】:我有一个如下的 json 变量,我需要将它插入到 postgres 表中的特定列中。我该怎么办呢
JSON 变量是 man_j,格式如下
"a_type":"Res", "display_type":"Res", "data_type":"AAA",
"source_name":"na", "li_details":"li_value":"na", "li_column":"na",
"additional_info": "d_name":"na",
"description":"na", "program":"program2",
"Author":"author2", "email":"na", "sum":"na",
"file_name":"na","additional_files":"na", "notify_email":"na"
我在 postgres 表中有一个名为 man_de 的列,我需要为其插入值
【问题讨论】:
你能粘贴一个包含所有变量的 json,以便告诉你如何提取你想要的东西@sanjana 上面提到的json需要插入到特定的列中,整个json存储在需要插入到该列中的变量中 如果不需要转换,那么最好直接加载 json,就像使用 INSERT 加载所有其他数据类型一样。在这里看不到 pyspark 的用途。 所以,创建的 json 变量在 pyspark 中,因此我的脚本应该在下一步将 json 自动加载到 postgres 表中的列中 将变量转换为数据框。使用 df.write method.API 文档将其直接写入表spark.apache.org/docs/latest/sql-data-sources-jdbc.html 以获取更多信息。 【参考方案1】:Spark 不直接提供单列更新。使用暂存区(在 db 级别)并单独更新 json 列(这比您手头的实际问题有点复杂),您的解决方案可能会变得复杂。
由于您也拥有其他列的数据,因此将 JSON 变量转换为 DF。 将它与其他列的 DF 结合起来。现在您可以直接附加一些内容。
-
DF1 --> 所有其他列
DF2 --> JSON 变量
加入 DF1 和 DF2(希望你有一些 PK 可以加入) 按照 RDBMS 表中的顺序重新排列列。 根据您的需要使用 df.write 写入/附加它
【讨论】:
【参考方案2】:我不确定这是否是你要找的东西,假设你的连接配置了 PostGre,那么你可以简单地调用下面来执行任何 sql 操作
df = df.withColumn("new_json_column", json_variable) // I am assuming here you have the correct schema or else you can create null column to ensure there is no schema mismatch
//to send only 1 transaction
df = df.show(1)
df.write.mode("append").jdbc(jdbcUrl, "db.table_name", connectionProperties)
【讨论】:
它将是表中现有的列名 不是问题..你可以给相同的列名 我收到 AssertionError : col 应该是 Column以上是关于如何使用pyspark将json对象插入postgres表中的列的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 PySpark 将 JSON 列类型写入 Postgres?
如何将 json 对象列表转换为单个 pyspark 数据框?
PySpark:将 SchemaRDD 映射到 SchemaRDD