如何使用 pySpark 使多个 json 处理更快?

Posted

技术标签:

【中文标题】如何使用 pySpark 使多个 json 处理更快?【英文标题】:How to make multiple json processing faster using pySpark? 【发布时间】:2018-11-29 16:05:44 【问题描述】:

我在 Databricks 中有一个 json 文件列表,我想做的是读取每个 json,提取值需要,然后将其附加到一个空的熊猫数据框中。每个 json 文件对应于最终数据帧上的一行。初始 json 文件列表长度为 50k。到目前为止,我构建的是下面的函数,它完美地完成了这项工作,但它花费了很多时间,以至于它让我在 5k 箱中子集 json 文件列表并运行每个一个分开。每个需要30分钟。我仅限于在 Databricks 中使用 3 节点集群

你有没有可能提高我的功能的效率?提前致谢。

### Create a big dataframe including all json files ###
def jsons_to_pdf(all_paths):
  # Create an empty pandas dataframes (it is defined only with column names)
  pdf = create_initial_pdf(samplefile)

  # Append each row into the above dataframe
  for path in all_paths:  
    # Create a spark dataframe
    sdf = sqlContext.read.json(path)

    # Create a two extracted lists of values
    init_values = sdf.select("id","logTimestamp","otherTimestamp").rdd.flatMap(lambda x: x).collect()
    id_values = sdf.select(sdf["dataPoints"]["value"]).rdd.flatMap(lambda x: x).collect()[0] 

    #Append the concatenated list each one as a row into the initial dataframe
    pdf.loc[len(pdf)] = init_values + id_values 

  return pdf

一个 json 文件如下所示:

而我想要实现的是将 dataPoints['id'] 作为新列,将 dataPoints['value'] 作为它们的值,以便最终变成这样:

【问题讨论】:

请添加一些示例数据和预期输出......仅拥有代码很难理解您想要实现的目标。可能阻塞点来自误用火花。 我编辑了我的问题。 每个“id”都存在于每个三元组(“imoNo”、时间戳、时间戳)? 【参考方案1】:

根据您的示例,您要执行的是枢轴,然后将您的数据转换为熊猫数据框。

步骤如下:

将所有的 json 收集到 1 个大数据帧中, 透视您的数据, 将它们转换为 pandas 数据框

试试这样的:

from functools import reduce 


def jsons_to_pdf(all_paths):

    # Create a big dataframe from all the jsons
    sdf = reduce(
        lambda a,b : a.union(b),
        [
            sqlContext.read.json(path)
            for path
            in all_paths
        ]
    )

    # select and pivot your data
    pivot_df = sdf.select(
        "imoNo",
        "logTimestamp",
        "payloadTimestamp",
        F.explode("datapoints").alias("datapoint")
    ).groupBy(
        "imoNo",
        "logTimestamp",
        "payloadTimestamp",
    ).pivot(
        "datapoint.id"
    ).sum("datapoint.value")

    # convert to a pandas dataframe
    pdf = pivot_df.toPandas()

    return pdf

根据您的评论,您可以将文件列表all_paths替换为通用路径并更改您创建sdf的方式:

all_paths = 'abc/*/*/*' # 3x*, one for year, one for month, one for day

def jsons_to_pdf(all_paths):

    # Create a big dataframe from all the jsons
    sdf = sqlContext.read.json(path)

这肯定会提高性能。

【讨论】:

感谢史蒂文的快速回答!当我用你的函数替换我的函数时,我得到ValueError: cannot set a row with mismatched columns,这是因为 collect 函数返回一个包含所有行信息的巨大列表,这些信息不能添加到空的 pandas 数据框中。我可以将 for-loop 应用于列表并将每组值(代表一行)迭代地添加到空数据框中。但这又可能会减慢这个过程。关于如何替换最后 3 行代码以使其按照最初的意图执行的任何想法?再次感谢! 我真的不明白你为什么要逐行添加它。只需创建一个列并将其添加到您的数据框中。您不必事先创建列。此外,您有 spark dataframes to_pandas 方法,可用于直接创建 pandas 数据框。 @oikonang 我根据您提供的内容完全更改了代码。但也许还有一些调整要执行。 嗨@Steven!您的代码按预期工作!但是执行处理仍然需要很多时间。为了给你一个想法,我的代码在 1.1 分钟内运行 100 个 json 文件,而你在 1.07 内运行。它比我的要好,但如果你认为我有 35k 的 json 文件,完成任务需要 6.4 小时。有没有办法以某种方式将其取下来?感谢您的宝贵时间。 不,它不是线性的......与你的相比,我的时间会越来越少

以上是关于如何使用 pySpark 使多个 json 处理更快?的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?

如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?

如何使用 Pyspark 并行处理多个镶木地板文件?

在s3中使用pyspark合并多个小json文件[重复]