用于 API 发布请求的 PySpark DataFrame 到 JSON

Posted

技术标签:

【中文标题】用于 API 发布请求的 PySpark DataFrame 到 JSON【英文标题】:PySpark DataFrame to JSON for API post request 【发布时间】:2020-10-07 17:11:55 【问题描述】:

我正在尝试将 PySpark 数据帧从 Hive 表转换为特定格式的 JSON,以通过 POST 方法将其作为数据发送到 API。我不知何故无法在 JSON 中获得“ValuesListIds”。感谢是否有人能提出解决方案。

数据框:

我正在寻找的 JSON 格式是:

“身份证”:0, “结果”:[2261730], “提交结果”: “ValuesListIds”:[58895] , “价值”:99, “来源”: “ValuesListIds”:[43861] , “日期”:“2020 年 9 月 30 日”

【问题讨论】:

【参考方案1】:

这个怎么样:

df.show()
# +----+---------+----------------+-------+--------+---------+
# | Id | results | submit_results | Value | Source |    Date |
# +----+---------+----------------+-------+--------+---------+
# |   0|[2261730]|         [58911]|     99|[439012]|9/30/2020|
# +----+---------+----------------+-------+--------+---------+

list_of_rows = df.collect()
# [Row(Date='9/30/2020', Id=0, Source=[439012], Value=99, results=[2261730], submit_results=[58911])]

d = list_of_rows[0].asDict()
# 'Date': '9/30/2020',
#  'Id': 0,
#  'Source': [439012],
#  'Value': 99,
#  'results': [2261730],
#  'submit_results': [58911]

# adjust the dict in-place
d["submit_results"] = "ValuesListIds": d["submit_results"]
d["Source"] = "ValuesListIds": d["Source"]

# now it looks like this:
# 'Date': '9/30/2020',
#  'Id': 0,
#  'Source': 'ValuesListIds': [439012],
#  'Value': 99,
#  'results': [2261730],
#  'submit_results': 'ValuesListIds': [58911]

# `requests` will automatically convert the dict to JSON
requests.post(url, data=d)

【讨论】:

感谢您试用...我需要“Submit_results”和“Source”字段的关键字“ValueListIDs”。这 2 个是嵌套的 JSON 对象。 呃抱歉,我错过了那部分,出于某种原因,我以为你在谈论pandas,而不是pyspark,所以我想我的回答现在毫无用处。您能否使用用于生成数据帧的pyspark 代码更新问题? 我有点解决了这个问题,但又卡在了一个部分。我在 rdd 中得到最终结果,但需要将 json.dump 放入 POST 请求的正文中。任何的想法? mdf = spark.sql('选择 0 作为 Id,array(2261730) 作为结果,array(58895) 作为 submit_results,45 作为值,array(43861) 作为源,“9/30/2020”作为表限制 1 中的日期') new = mdf.select("Id","Results", F.struct(F.col('results').alias('ValuesListIDs')).alias('results'), "Value", F. struct(F.col('Source').alias('ValuesListIDs')).alias('Source'), "Date") new.toJSON() 好酷! .toJSON() 返回一个 RDD,因此您需要在其上调用 .collect() 以将其转换为 JSON 字符串列表,或者您可以使用 .first() 仅获取第一个元素。然后你需要像response = requests.post(url, json=your_json_from_rdd)这样的东西。 好的,我已经重写了答案,现在应该更有意义了。由于看起来您只需要结果的单行,因此我立即执行了df.collect(),因此我可以使用普通的 Python dict 工作,在我看来,这比使用 struct 创建一个新的 DataFrame 更像是一种矫枉过正(?)列。

以上是关于用于 API 发布请求的 PySpark DataFrame 到 JSON的主要内容,如果未能解决你的问题,请参考以下文章

从 Pyspark 中读取文件后模拟流数据

Laravel 请求验证不适用于邮递员请求

ImportError:运行火花时没有名为请求的模块

pyspark 诱人的行为

Youtube Data API C# - 无需请求用户凭据即可使用

YouTube Data v3 API - 如何从频道请求所有视频?