使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?
Posted
技术标签:
【中文标题】使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?【英文标题】:Using pyspark, how do I read multiple JSON documents on a single line in a file into a dataframe? 【发布时间】:2018-07-12 20:52:03 【问题描述】:使用 Spark 2.3,我知道我可以像这样读取 JSON 文档文件:
'key': 'val1'
'key': 'val2'
有了这个:
spark.json.read('filename')
当 JSON 文档之间没有换行符时,如何将以下内容读入数据框中?
以下是输入示例。
'key': 'val1''key': 'val2'
明确地说,我希望数据框有两行 (frame.count() == 2
)。
【问题讨论】:
我建议修复您的输入文件,而不是与 Spark 读取文件的方式作斗争,因为这不是有效的 JSON 对象或 JSONlines 格式 【参考方案1】:请试一试——
df = spark.read.json(["fileName1","fileName2"])
如果你想读取文件夹中所有的json文件也可以这样做-
df = spark.read.json("data/*json")
【讨论】:
问题不在于我有多个文件 - 问题在于,在一个文件中,我有多个 JSON 文档之间没有换行符。 这帮助我解决了我的问题。谢谢。【参考方案2】:正如上面@cricket_007 所建议的,你最好修复输入文件
如果您确定 json 对象中没有内联右括号,您可以执行以下操作:
with open('myfilename', 'r') as f:
txt = f.read()
txt = txt.replace('', '\n')
with open('mynewfilename', 'w') as f:
f.write(txt)
如果您在键或值中确实有“”,则使用正则表达式的任务会变得更难,但并非不可能。不过似乎不太可能。
【讨论】:
【参考方案3】:我们使用 RDD-Api 解决了这个问题,因为我们找不到任何以内存高效方式使用 Dataframe-API 的方法(我们总是遇到执行程序 OoM-Errors)。
以下函数将逐步尝试从您的文件 (from this post) 解析 json 和 yield
ing 后续 json:
from functools import partial
from json import JSONDecoder
from io import StringIO
def generate_from_buffer(buffer: str, chunk: str, decoder: JSONDecoder):
buffer += chunk
while buffer:
try:
result, index = decoder.raw_decode(buffer)
yield result
buffer = buffer[index:].lstrip()
except ValueError:
# Not enough data to decode, read more
break
return buffer
def parse_jsons_file(jsons_str: str, buffer_size: int = 1024):
decoder = JSONDecoder()
buffer = ''
file_obj = StringIO(jsons_str)
for chunk in iter(partial(file_obj.read, buffer_size), ''):
buffer = yield from generate_from_buffer(buffer, chunk, decoder)
if buffer:
raise ValueError("Invalid input: should be concatenation of json strings")
我们首先用.format("text")
读取json:
df: DataFrame = (
spark
.read
.format("text")
.option("wholetext", True)
.load(data_source_path)
)
然后使用上面的函数将其转换为RDD,flatMap
,最后将其转换回火花数据帧。为此,您必须为文件中的单个 json 定义 json_schema
,无论如何这都是一种好习惯。
rdd_df = (df_rdd.map(lambda row: row["value"])
.flatMap(lambda jsons_string: parse_jsons_file(jsons_string))
.toDF(json_schema))
【讨论】:
以上是关于使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:如何在 Yarn 集群上运行作业时对多个文件使用 --files 标签