用于 API 发布请求的 PySpark DataFrame 到 JSON
Posted
技术标签:
【中文标题】用于 API 发布请求的 PySpark DataFrame 到 JSON【英文标题】:PySpark DataFrame to JSON for API post request 【发布时间】:2020-10-07 17:11:55 【问题描述】:我正在尝试将 PySpark 数据帧从 Hive 表转换为特定格式的 JSON,以通过 POST 方法将其作为数据发送到 API。我不知何故无法在 JSON 中获得“ValuesListIds”。感谢是否有人能提出解决方案。
数据框:
我正在寻找的 JSON 格式是:
“身份证”:0, “结果”:[2261730], “提交结果”: “ValuesListIds”:[58895] , “价值”:99, “来源”: “ValuesListIds”:[43861] , “日期”:“2020 年 9 月 30 日”
【问题讨论】:
【参考方案1】:这个怎么样:
df.show()
# +----+---------+----------------+-------+--------+---------+
# | Id | results | submit_results | Value | Source | Date |
# +----+---------+----------------+-------+--------+---------+
# | 0|[2261730]| [58911]| 99|[439012]|9/30/2020|
# +----+---------+----------------+-------+--------+---------+
list_of_rows = df.collect()
# [Row(Date='9/30/2020', Id=0, Source=[439012], Value=99, results=[2261730], submit_results=[58911])]
d = list_of_rows[0].asDict()
# 'Date': '9/30/2020',
# 'Id': 0,
# 'Source': [439012],
# 'Value': 99,
# 'results': [2261730],
# 'submit_results': [58911]
# adjust the dict in-place
d["submit_results"] = "ValuesListIds": d["submit_results"]
d["Source"] = "ValuesListIds": d["Source"]
# now it looks like this:
# 'Date': '9/30/2020',
# 'Id': 0,
# 'Source': 'ValuesListIds': [439012],
# 'Value': 99,
# 'results': [2261730],
# 'submit_results': 'ValuesListIds': [58911]
# `requests` will automatically convert the dict to JSON
requests.post(url, data=d)
【讨论】:
感谢您试用...我需要“Submit_results”和“Source”字段的关键字“ValueListIDs”。这 2 个是嵌套的 JSON 对象。 呃抱歉,我错过了那部分,出于某种原因,我以为你在谈论pandas
,而不是pyspark
,所以我想我的回答现在毫无用处。您能否使用用于生成数据帧的pyspark
代码更新问题?
我有点解决了这个问题,但又卡在了一个部分。我在 rdd 中得到最终结果,但需要将 json.dump 放入 POST 请求的正文中。任何的想法? mdf = spark.sql('选择 0 作为 Id,array(2261730) 作为结果,array(58895) 作为 submit_results,45 作为值,array(43861) 作为源,“9/30/2020”作为表限制 1 中的日期') new = mdf.select("Id","Results", F.struct(F.col('results').alias('ValuesListIDs')).alias('results'), "Value", F. struct(F.col('Source').alias('ValuesListIDs')).alias('Source'), "Date") new.toJSON()
好酷! .toJSON()
返回一个 RDD,因此您需要在其上调用 .collect()
以将其转换为 JSON 字符串列表,或者您可以使用 .first()
仅获取第一个元素。然后你需要像response = requests.post(url, json=your_json_from_rdd)
这样的东西。
好的,我已经重写了答案,现在应该更有意义了。由于看起来您只需要结果的单行,因此我立即执行了df.collect()
,因此我可以使用普通的 Python dict
工作,在我看来,这比使用 struct 创建一个新的 DataFrame 更像是一种矫枉过正(?)列。以上是关于用于 API 发布请求的 PySpark DataFrame 到 JSON的主要内容,如果未能解决你的问题,请参考以下文章