pyspark.sql 转 JSON

Posted

技术标签:

【中文标题】pyspark.sql 转 JSON【英文标题】:pyspark.sql to JSON 【发布时间】:2019-08-23 01:46:03 【问题描述】:

从 pyspark.sql 查询中获取有效 json 文档的最佳方法是什么。 例如,如果我运行:

spark.sql("show tables")

我看到它返回了一个数据帧,我可以在它上面调用“toJSON”来获取一个 RDD。但是我找不到将其转换为 json 文档的好方法。我在它上面调用了“collect()”,但这并没有返回有效的 json,而且似乎不是将数据帧转换为 json 的最有效方法。

【问题讨论】:

【参考方案1】:

一种方法是将 JSON RDD 编写为文本文件。 JSON 将被正确格式化。

df.toJSON().saveAsTextFile("/tmp/jsonRecords")

请注意,这将在每个分区写入一个文件。所以需要手动连接它们。

该方法改编自答案here,它使用了Scala。

【讨论】:

谢谢,有没有更有效的方法来做到这一点?原因是我这样做是为了处理 api 响应,因此响应时间很重要,并且从文件中写入和读取会花费更长的时间。 在这种情况下,最好直接使用toJSON()给出的JSON RDD,并直接对它们进行处理。 JSON RDD 是 JSON 行格式,在 Spark 中更容易处理分布式处理。如果您可以编辑您的问题以包含一个可重复的示例以及您需要的处理,我可以进一步研究它。 感谢您的回复,我想让您知道我使用我在下面的答案中发布的方法获得了 json。【参考方案2】:

我使用以下方法得到了 json 响应:

def exec_spark_sql_query(query):
    json_rdd = spark.sql(query).toJSON()
    rdd_list = json_rdd.collect()
    output_json = ''
    for i, entry in enumerate(rdd_list):
        if (i == 0):
            output_json = output_json + '['
        if (i == len(rdd_list) - 1):
            return (output_json + entry + ']')
        output_json = output_json + entry + ','
    return output_json

我知道这绝对不是为使用 SparkSQL 执行 SQL 查询的 Python API 调用获取 json 响应的最佳方式,但这可以完成工作。

【讨论】:

以上是关于pyspark.sql 转 JSON的主要内容,如果未能解决你的问题,请参考以下文章

PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

从 pyspark.sql.types 导入行和从 pyspark.sql 导入行

Pyspark:依靠 pyspark.sql.dataframe.DataFrame 需要很长时间

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?

Pyspark:将 pyspark.sql.row 转换为 Dataframe