在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)

Posted

技术标签:

【中文标题】在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)【英文标题】:Generating large DataFrame in a distributed way in pyspark efficiently (without pyspark.sql.Row) 【发布时间】:2020-09-12 09:40:57 【问题描述】:

问题归结为以下几点:我想在 pyspark 中使用现有的并行化输入集合和一个给定一个输入的函数生成一个 DataFrame,该函数可以生成相对大量的行。在下面的示例中,我想使用例如生成 10^12 行数据框1000 名执行者:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(我真的不想研究给定种子的随机数分布 - 这只是我能够想出的一个例子来说明大型数据帧不是从仓库加载而是由代码生成的情况)

上面的代码几乎完全符合我的要求。问题是它以一种非常低效的方式进行 - 代价是为每一行创建一个 Python Row 对象,然后将 Python Row 对象转换为内部 Spark 列表示。

有没有一种方法我可以通过让 spark 知道这些是一批值的列来转换已经以列表示形式的一批行(例如,一个或几个如上 np_array 的 numpy 数组)?

例如我可以编写代码来生成 python 集合 RDD,其中每个元素都是 pyarrow.RecordBatch 或 pandas.DataFrame,但我找不到将其中任何一个转换为 Spark DataFrame 的方法,而无需在过程。

至少有十几篇文章提供了如何使用 pyarrow + pandas 将本地(到驱动程序)pandas 数据帧有效地转换为 Spark 数据帧的示例,但这对我来说不是一个选择,因为我实际上需要数据在执行程序上以分布式方式生成,而不是在驱动程序上生成一个 pandas 数据帧并将其发送给执行程序。

UPD。 我找到了一种避免创建 Row 对象的方法——使用 Python 元组的 RDD。正如预期的那样,它仍然太慢了,但仍然比使用 Row 对象快一点。尽管如此,这并不是我真正想要的(这是一种将列数据从 python 传递到 Spark 的非常有效的方法)。

还测量了在机器上执行某些操作的时间(粗略的方法,测量时间有相当多的变化,但在我看来它仍然具有代表性): 有问题的数据集是 10M 行,3 列(一列是常数整数,另一列是从 0 到 10M-1 的整数范围,第三是使用 np.random.random_sample 生成的浮点值:

本地生成 pandas 数据帧(10M 行):~440-450ms 本地生成 spark.sql.Row 对象的 python 列表(10M 行):~12-15s 本地生成表示行(10M 行)的 python 元组列表:~3.4-3.5s

仅使用 1 个执行程序和 1 个初始种子值生成 Spark 数据帧:

使用spark.createDataFrame(row_rdd, schema=my_schema): ~70-80s 使用spark.createDataFrame(tuple_rdd, schema=my_schema): ~40-45s (非分布式创建)使用spark.createDataFrame(pandas_df, schema=my_schema):~0.4-0.5s(没有 pandas df 生成本身,这需要大致相同的时间) - spark.sql.execution.arrow.enabled 设置为 true。

本地到驱动程序 pandas 数据帧在约 1 秒内转换为 Spark 数据帧的 1000 万行的示例让我有理由相信执行程序中生成的数据帧应该可以实现。然而,我现在可以达到的最快速度是使用 Python 元组的 RDD 处理 1000 万行约 40 秒。

所以问题仍然存在 - 有没有办法在 pyspark 中以分布式方式有效地生成大型 Spark 数据帧?

【问题讨论】:

【参考方案1】:

听起来瓶颈是从 RDD -> Dataframes 的转换,并且手头的功能相当快,并且 pandas DF 转换通过 pyarrow 触发 DF 非常快。以下是两种可能的解决方案:

    由于很容易并行创建 pandas df,而不是从执行程序返回它,而是使用 df.to_parquet 编写生成的 df,即:
def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    df.reset_index().to_parquet(f"s3://bucket/part-str(seed).zfill(5).parquet"

在生成的 parquet 文件中的 Spark 读取应该是微不足道的。然后你的瓶颈变成了 IO 限制,这应该比 spark 转换元组/行类型更快。

    如果您不允许将任何内容保存到文件中,pandas_udfGROUPED_MAP 可能会帮助您,前提是您的 spark 版本足够新。它还使用 pyarrow 在 spark DF 和 pandas DF 之间进行转换,因此它应该比使用元组更快,并允许您以分布式方式从 UDF 创建和返回 pandas DF。
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

N = 10

df = spark.createDataFrame(
    [(i,) for i in range(N)], ["seed"]
)

def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    return df.reset_index()

@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
    output = []
    for idx, row in pdf.iterrows():
        output.append(generate_data(row["seed"]))
    return pd.concat(output)


df.groupby("seed").apply(generate_data_udf).show()

较慢的部分将是groupby,您可以根据您将种子放入generate_data_udf 的方式来加快速度,即:

@udf(returnType=IntegerType())
def batch_seed(seed):
    return seed // 10

df.withColumn("batch_seed", batch_seed(col("seed"))). \
groupBy("batch_seed").apply(generate_data_udf).show()

【讨论】:

我正在考虑做类似于选项 (1) 的事情,但我不能使用分布式文件系统。我可以从执行者在本地编写的文件创建一个 DF,但我认为没有任何可能这样做。让我看看选项(2)。 有趣。我之前实际上看过 pandas UDF,但没有找到使用它们生成自定义行数的方法(认为它们只允许一对一的转换或将多行聚合为一个,但不允许爆炸式生成器或类似的像这里一样分组爆炸)。结果我完全误解了GROUPED_MAP 的用法,它实际上允许生成自定义数量的数据。 GROUPED_MAP 的解决方案几乎正是我想要的。这比使用元组的 RDD 快 4-20 倍(很大程度上取决于如何测量)。非常感谢!【参考方案2】:

这是一个不使用 RDD 或创建 Rows,而仅使用数据框操作的解决方案: (代码在scala中,但在python中做同样的事情应该很简单)

val N = 100000

//for seed return array of index and random_value
def generate_data(i: Int): Array[(Int, Double)] = ???
val generate_data_udf = udf (generate_data _)

spark
  .range(N)
  .toDF("seed")
  .withColumn("arr", generate_data_udf($"seed"))
  .select(
    $"seed",
    explode($"arr") as "exp"
  )
  .select(
    $"seed",
    $"exp._1" as "n",
    $"exp._2" as "x"
  )

【讨论】:

不幸的是,这比在 python 中创建 Row 对象要慢得多。在我的例子中,生成包含 array-to-explode-later 的一行的函数需要 3 秒才能生成,但是这个时间被 Spark 需要将一行 RDD 转换为 DF 的时间所掩盖(在我的例子中大约 370 秒与模式指定和所有)。我没有测量爆炸的时间,但预计与其他步骤相比,它的时间可以忽略不计。所以在 pyspark 中尝试这个比使用 Row 对象的 RDD 慢大约 5 倍(比使用 Python 元组的 RDD 慢 10 倍)。【参考方案3】:

这是不使用 Row 的问题的解决方案 - 仅基于 RDD。我认为这可能是最有效的方法,因为它使用map 来计算你的函数输出,并使用flatMap 来组合这些输出——这两个操作都是在 RDD 上执行的,所以一切都应该分布。

import numpy as np
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext

def generate_data(one_integer):
  M = 2 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  return [(one_integer, i, float(np_array[i])) for i in range(M)]

N = 30 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = sc.parallelize(list_of_integers)
generated_data_rdd = list_of_integers_rdd.map(lambda x: generate_data(x))
solved_rdd = generated_data_rdd.flatMap(lambda list: list)

df = spark.createDataFrame(solved_rdd).toDF("seed", "n", "x")
df.show()

【讨论】:

这个方法在我的评测部分的帖子中已经提到过,但效率仍然很低(详情在我的问题中)。

以上是关于在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Pyspark 中以编程方式解析固定宽度的文本文件?

如何有效地将 MySQL 表读入 Apache Spark/PySpark?

在 pandas 中有效地使用替换

在 C# 中将大型双数组保存为文件的最有效方法

如何在 Pyspark 中以编程方式使用“计数”?

如何在 MATLAB 中以 *.dat 格式导出图像