优化 Pyspark 性能以匹配 Pandas / Dask?

Posted

技术标签:

【中文标题】优化 Pyspark 性能以匹配 Pandas / Dask?【英文标题】:Optimizing Pyspark Performance to Match Pandas / Dask? 【发布时间】:2018-09-04 19:24:34 【问题描述】:

我有每周的时间序列数据,我正在尝试使用 Pyspark SQL 来计算几个列的过去 8 周的每周总和。我尝试过使用 Pyspark 窗口函数;具体来说:

sum(df[valueCol]).over(partitionBy(df[idCol]).orderBy(df[timeCol]).rangeBetween(-7, 0))

但此代码运行速度非常慢(1000 个唯一 ID 和 170 个时间步长,每列 30-60 秒)。我从其他 *** 问题中了解到,分区和洗牌可能会导致性能问题,因此为了更好地理解这些问题,我在 8 列中手动计算每周的 8 个最近的每周值,然后将这些列添加到后面的 8 周'总和。

这是我创建的简化数据集:

idCount = 2
tCount = 10

df = pd.DataFrame('customerId': [x for x in range(idCount) for y in range(tCount)],
               't': [y for x in range(idCount) for y in range(tCount)],
               'vals': range(idCount * tCount))[['customerId', 't', 'vals']]

创建此数据框:

输入数据框

   customerId   t   vals
0           0   0   0
1           0   1   1
2           0   2   2
3           0   3   3
4           0   4   4
5           0   5   5
6           0   6   6
7           0   7   7
8           0   8   8
9           0   9   9
10          1   0   10
11          1   1   11
12          1   2   12
13          1   3   13
14          1   4   14
15          1   5   15
16          1   6   16
17          1   7   17
18          1   8   18
19          1   9   19

我的目标输出是 8 周滞后的“vals”列,包括 vals_0 作为本周的值,其中数据不可用的 NaN:

目标输出数据框

    customerId  t  vals_0  vals_1  vals_2  vals_3  vals_4  vals_5  vals_6  vals_7
0            0  0       0     NaN     NaN     NaN     NaN     NaN     NaN     NaN
1            0  1       1     0.0     NaN     NaN     NaN     NaN     NaN     NaN
2            0  2       2     1.0     0.0     NaN     NaN     NaN     NaN     NaN
3            0  3       3     2.0     1.0     0.0     NaN     NaN     NaN     NaN
4            0  4       4     3.0     2.0     1.0     0.0     NaN     NaN     NaN
5            0  5       5     4.0     3.0     2.0     1.0     0.0     NaN     NaN
6            0  6       6     5.0     4.0     3.0     2.0     1.0     0.0     NaN
7            0  7       7     6.0     5.0     4.0     3.0     2.0     1.0     0.0
8            0  8       8     7.0     6.0     5.0     4.0     3.0     2.0     1.0
9            0  9       9     8.0     7.0     6.0     5.0     4.0     3.0     2.0
10           1  0      10     NaN     NaN     NaN     NaN     NaN     NaN     NaN
11           1  1      11    10.0     NaN     NaN     NaN     NaN     NaN     NaN
12           1  2      12    11.0    10.0     NaN     NaN     NaN     NaN     NaN
13           1  3      13    12.0    11.0    10.0     NaN     NaN     NaN     NaN
14           1  4      14    13.0    12.0    11.0    10.0     NaN     NaN     NaN
15           1  5      15    14.0    13.0    12.0    11.0    10.0     NaN     NaN
16           1  6      16    15.0    14.0    13.0    12.0    11.0    10.0     NaN
17           1  7      17    16.0    15.0    14.0    13.0    12.0    11.0    10.0
18           1  8      18    17.0    16.0    15.0    14.0    13.0    12.0    11.0
19           1  9      19    18.0    17.0    16.0    15.0    14.0    13.0    12.0

以下 Pandas 函数创建目标输出数据框:

def get_lag_cols_pandas(df, partCol, timeCol, lagCol, numLags):
    newdf = df[[partCol, timeCol, lagCol]]
    for x in range(numLags):
        newCol = '_'.format(lagCol, x)
        joindf = newdf[[partCol, timeCol, lagCol]]
        joindf[timeCol] = newdf[timeCol] + x
        joindf = joindf.rename(columns = lagCol: newCol)
        newdf = newdf.merge(joindf, how = 'left', on = [partCol, timeCol])
    return newdf.drop(lagCol, axis = 1)

并在大约 500 毫秒内运行:

>>> %timeit print('pandas result: \n\n\n'.format(get_lag_cols_pandas(df, 'customerId', 't', 'vals', 8)))
1 loop, best of 3: 501 ms per loop

我也可以在 Dask 中使用 map_partitions() 完成此操作,并在大约 900 毫秒内获得相同的结果(由于启动线程的开销,可能比 Pandas 更差):

>>> ddf = dd.from_pandas(df, npartitions = 1)
>>> %timeit print('dask result: \n\n\n'.format(ddf.map_partitions(lambda df: get_lag_cols_pandas(df, \
                                                    'customerId', 't', 'vals', 8)).compute(scheduler = 'threads')))
1 loop, best of 3: 893 ms per loop

我也可以在 Pyspark 中完成此操作(注意:对于 Dask 和 Spark,我只有一个分区,以便与 Pandas 进行更公平的比较):

>>> sparkType = SparkSession.builder.master('local[1]')
>>> spark = sparkType.getOrCreate()
>>> sdf = spark.createDataFrame(df)
>>> sdf.show()
+----------+---+----+
|customerId|  t|vals|
+----------+---+----+
|         0|  0|   0|
|         0|  1|   1|
|         0|  2|   2|
|         0|  3|   3|
|         0|  4|   4|
|         0|  5|   5|
|         0|  6|   6|
|         0|  7|   7|
|         0|  8|   8|
|         0|  9|   9|
|         1|  0|  10|
|         1|  1|  11|
|         1|  2|  12|
|         1|  3|  13|
|         1|  4|  14|
|         1|  5|  15|
|         1|  6|  16|
|         1|  7|  17|
|         1|  8|  18|
|         1|  9|  19|
+----------+---+----+
>>> sdf.rdd.getNumPartitions()
1

使用以下代码:

def get_lag_cols_spark(df, partCol, timeCol, lagCol, numLags):
    newdf = df.select(df[partCol], df[timeCol], df[lagCol])
    for x in range(numLags):
        newCol = '_'.format(lagCol, x)
        joindf = newdf.withColumn('newIdx', newdf[timeCol] + x) \
                                     .drop(timeCol).withColumnRenamed('newIdx', timeCol) \
                                     .withColumnRenamed(lagCol, newCol)
        newdf = newdf.join(joindf.select(joindf[partCol], joindf[timeCol], joindf[newCol]), [partCol, timeCol], how = 'left')
    newdf = newdf.drop(lagCol)
    return newdf

我得到了正确的结果(虽然被打乱了):

+----------+---+------+------+------+------+------+------+------+------+
|customerId|  t|vals_0|vals_1|vals_2|vals_3|vals_4|vals_5|vals_6|vals_7|
+----------+---+------+------+------+------+------+------+------+------+
|         1|  3|    13|    12|    11|    10|  null|  null|  null|  null|
|         1|  0|    10|  null|  null|  null|  null|  null|  null|  null|
|         1|  1|    11|    10|  null|  null|  null|  null|  null|  null|
|         0|  9|     9|     8|     7|     6|     5|     4|     3|     2|
|         0|  1|     1|     0|  null|  null|  null|  null|  null|  null|
|         1|  4|    14|    13|    12|    11|    10|  null|  null|  null|
|         0|  4|     4|     3|     2|     1|     0|  null|  null|  null|
|         0|  3|     3|     2|     1|     0|  null|  null|  null|  null|
|         0|  7|     7|     6|     5|     4|     3|     2|     1|     0|
|         1|  5|    15|    14|    13|    12|    11|    10|  null|  null|
|         1|  6|    16|    15|    14|    13|    12|    11|    10|  null|
|         0|  6|     6|     5|     4|     3|     2|     1|     0|  null|
|         1|  7|    17|    16|    15|    14|    13|    12|    11|    10|
|         0|  8|     8|     7|     6|     5|     4|     3|     2|     1|
|         0|  0|     0|  null|  null|  null|  null|  null|  null|  null|
|         0|  2|     2|     1|     0|  null|  null|  null|  null|  null|
|         1|  2|    12|    11|    10|  null|  null|  null|  null|  null|
|         1|  9|    19|    18|    17|    16|    15|    14|    13|    12|
|         0|  5|     5|     4|     3|     2|     1|     0|  null|  null|
|         1|  8|    18|    17|    16|    15|    14|    13|    12|    11|
+----------+---+------+------+------+------+------+------+------+------+

但 Pyspark 版本需要 明显更长 运行(34 秒):

>>> %timeit get_lag_cols_spark(sdf, 'customerId', 't', 'vals', 8).show()
1 loop, best of 3: 34 s per loop

我让这个示例小而简单(只有 20 个数据行,Dask 和 Spark 都只有 1 个分区),所以我不认为内存和 CPU 使用会导致显着的性能差异。

我的问题是:有什么方法可以更好地配置 Pyspark 或优化 Pyspark 在此特定任务上的执行,以使 Pyspark 在速度方面更接近 Pandas 和 Dask(即 0.5-1.0 秒)?

【问题讨论】:

【参考方案1】:

根据定义,pyspark 很慢,因为 Spark 本身是用 Scala 编写的,任何 pyspark 程序都涉及运行至少 1 个 JVM(通常是 1 个驱动程序和多个工作程序)和 python 程序(每个工作程序 1 个)以及它们之间的通信。 java和python端的进程间通信量取决于你使用的python代码。

即使没有所有语言间的喧嚣,Spark 也有很多开销用于处理大数据分布式处理 - 这意味着 Spark 程序往往比任何非分布式解决方案都要慢......只要规模很小。 Spark 和 pyspark 是专门为大规模构建的,这就是它的亮点

【讨论】:

以上是关于优化 Pyspark 性能以匹配 Pandas / Dask?的主要内容,如果未能解决你的问题,请参考以下文章

为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?

pyspark 数据框比较以根据关键字段查找列差异

将 Pandas Python 转换为 Pyspark

需要 Pyspark/Pandas 脚本来实现所需的结果 [关闭]

pySpark 数据框转换性能

Pyspark:以表格格式显示火花数据框