如何使用 pySpark 使多个 json 处理更快?
Posted
技术标签:
【中文标题】如何使用 pySpark 使多个 json 处理更快?【英文标题】:How to make multiple json processing faster using pySpark? 【发布时间】:2018-11-29 16:05:44 【问题描述】:我在 Databricks 中有一个 json 文件列表,我想做的是读取每个 json,提取值需要,然后将其附加到一个空的熊猫数据框中。每个 json 文件对应于最终数据帧上的一行。初始 json 文件列表长度为 50k。到目前为止,我构建的是下面的函数,它完美地完成了这项工作,但它花费了很多时间,以至于它让我在 5k 箱中子集 json 文件列表并运行每个一个分开。每个需要30分钟。我仅限于在 Databricks 中使用 3 节点集群。
你有没有可能提高我的功能的效率?提前致谢。
### Create a big dataframe including all json files ###
def jsons_to_pdf(all_paths):
# Create an empty pandas dataframes (it is defined only with column names)
pdf = create_initial_pdf(samplefile)
# Append each row into the above dataframe
for path in all_paths:
# Create a spark dataframe
sdf = sqlContext.read.json(path)
# Create a two extracted lists of values
init_values = sdf.select("id","logTimestamp","otherTimestamp").rdd.flatMap(lambda x: x).collect()
id_values = sdf.select(sdf["dataPoints"]["value"]).rdd.flatMap(lambda x: x).collect()[0]
#Append the concatenated list each one as a row into the initial dataframe
pdf.loc[len(pdf)] = init_values + id_values
return pdf
一个 json 文件如下所示:
而我想要实现的是将 dataPoints['id'] 作为新列,将 dataPoints['value'] 作为它们的值,以便最终变成这样:
【问题讨论】:
请添加一些示例数据和预期输出......仅拥有代码很难理解您想要实现的目标。可能阻塞点来自误用火花。 我编辑了我的问题。 每个“id”都存在于每个三元组(“imoNo”、时间戳、时间戳)? 【参考方案1】:根据您的示例,您要执行的是枢轴,然后将您的数据转换为熊猫数据框。
步骤如下:
将所有的 json 收集到 1 个大数据帧中, 透视您的数据, 将它们转换为 pandas 数据框试试这样的:
from functools import reduce
def jsons_to_pdf(all_paths):
# Create a big dataframe from all the jsons
sdf = reduce(
lambda a,b : a.union(b),
[
sqlContext.read.json(path)
for path
in all_paths
]
)
# select and pivot your data
pivot_df = sdf.select(
"imoNo",
"logTimestamp",
"payloadTimestamp",
F.explode("datapoints").alias("datapoint")
).groupBy(
"imoNo",
"logTimestamp",
"payloadTimestamp",
).pivot(
"datapoint.id"
).sum("datapoint.value")
# convert to a pandas dataframe
pdf = pivot_df.toPandas()
return pdf
根据您的评论,您可以将文件列表all_paths
替换为通用路径并更改您创建sdf
的方式:
all_paths = 'abc/*/*/*' # 3x*, one for year, one for month, one for day
def jsons_to_pdf(all_paths):
# Create a big dataframe from all the jsons
sdf = sqlContext.read.json(path)
这肯定会提高性能。
【讨论】:
感谢史蒂文的快速回答!当我用你的函数替换我的函数时,我得到ValueError: cannot set a row with mismatched columns
,这是因为 collect 函数返回一个包含所有行信息的巨大列表,这些信息不能添加到空的 pandas 数据框中。我可以将 for-loop 应用于列表并将每组值(代表一行)迭代地添加到空数据框中。但这又可能会减慢这个过程。关于如何替换最后 3 行代码以使其按照最初的意图执行的任何想法?再次感谢!
我真的不明白你为什么要逐行添加它。只需创建一个列并将其添加到您的数据框中。您不必事先创建列。此外,您有 spark dataframes to_pandas
方法,可用于直接创建 pandas 数据框。
@oikonang 我根据您提供的内容完全更改了代码。但也许还有一些调整要执行。
嗨@Steven!您的代码按预期工作!但是执行处理仍然需要很多时间。为了给你一个想法,我的代码在 1.1 分钟内运行 100 个 json 文件,而你在 1.07 内运行。它比我的要好,但如果你认为我有 35k 的 json 文件,完成任务需要 6.4 小时。有没有办法以某种方式将其取下来?感谢您的宝贵时间。
不,它不是线性的......与你的相比,我的时间会越来越少以上是关于如何使用 pySpark 使多个 json 处理更快?的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?
如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行
如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列