使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?

Posted

技术标签:

【中文标题】使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?【英文标题】:Using pyspark, how do I read multiple JSON documents on a single line in a file into a dataframe? 【发布时间】:2018-07-12 20:52:03 【问题描述】:

使用 Spark 2.3,我知道我可以像这样读取 JSON 文档文件:

'key': 'val1'
'key': 'val2'

有了这个:

spark.json.read('filename')

当 JSON 文档之间没有换行符时,如何将以下内容读入数据框中?

以下是输入示例。

'key': 'val1''key': 'val2'

明确地说,我希望数据框有两行 (frame.count() == 2)。

【问题讨论】:

我建议修复您的输入文件,而不是与 Spark 读取文件的方式作斗争,因为这不是有效的 JSON 对象或 JSONlines 格式 【参考方案1】:

请试一试——

df = spark.read.json(["fileName1","fileName2"])

如果你想读取文件夹中所有的json文件也可以这样做-

df = spark.read.json("data/*json")

【讨论】:

问题不在于我有多个文件 - 问题在于,在一个文件中,我有多个 JSON 文档之间没有换行符。 这帮助我解决了我的问题。谢谢。【参考方案2】:

正如上面@cricket_007 所建议的,你最好修复输入文件

如果您确定 json 对象中没有内联右括号,您可以执行以下操作:

with open('myfilename', 'r') as f:
    txt = f.read()

txt = txt.replace('', '\n')

with open('mynewfilename', 'w') as f:
    f.write(txt)

如果您在键或值中确实有“”,则使用正则表达式的任务会变得更难,但并非不可能。不过似乎不太可能。

【讨论】:

【参考方案3】:

我们使用 RDD-Api 解决了这个问题,因为我们找不到任何以内存高效方式使用 Dataframe-API 的方法(我们总是遇到执行程序 OoM-Errors)。

以下函数将逐步尝试从您的文件 (from this post) 解析 json 和 yielding 后续 json:

from functools import partial
from json import JSONDecoder
from io import StringIO

def generate_from_buffer(buffer: str, chunk: str, decoder: JSONDecoder):
    buffer += chunk
    while buffer:
        try:
            result, index = decoder.raw_decode(buffer)
            yield result
            buffer = buffer[index:].lstrip()
        except ValueError:
            # Not enough data to decode, read more
            break
    return buffer


def parse_jsons_file(jsons_str: str, buffer_size: int = 1024):
    decoder = JSONDecoder()
    buffer = ''
    file_obj = StringIO(jsons_str)
    for chunk in iter(partial(file_obj.read, buffer_size), ''):
        buffer = yield from generate_from_buffer(buffer, chunk, decoder)
    if buffer:
        raise ValueError("Invalid input: should be concatenation of json strings")

我们首先用.format("text")读取json:

    df: DataFrame = (
        spark
        .read
        .format("text")
        .option("wholetext", True)
        .load(data_source_path)
    )

然后使用上面的函数将其转换为RDD,flatMap,最后将其转换回火花数据帧。为此,您必须为文件中的单个 json 定义 json_schema,无论如何这都是一种好习惯。

    rdd_df = (df_rdd.map(lambda row: row["value"])
                 .flatMap(lambda jsons_string: parse_jsons_file(jsons_string))
                 .toDF(json_schema))

【讨论】:

以上是关于使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Pyspark 并行处理多个镶木地板文件?

在 html 文件的单行中引用多个 js 文件

Pyspark:如何在 Yarn 集群上运行作业时对多个文件使用 --files 标签

如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件

如何使用 pySpark 使多个 json 处理更快?

连接后如何在 Pyspark Dataframe 中选择和排序多个列