优化 Pyspark 性能以匹配 Pandas / Dask?
Posted
技术标签:
【中文标题】优化 Pyspark 性能以匹配 Pandas / Dask?【英文标题】:Optimizing Pyspark Performance to Match Pandas / Dask? 【发布时间】:2018-09-04 19:24:34 【问题描述】:我有每周的时间序列数据,我正在尝试使用 Pyspark SQL 来计算几个列的过去 8 周的每周总和。我尝试过使用 Pyspark 窗口函数;具体来说:
sum(df[valueCol]).over(partitionBy(df[idCol]).orderBy(df[timeCol]).rangeBetween(-7, 0))
但此代码运行速度非常慢(1000 个唯一 ID 和 170 个时间步长,每列 30-60 秒)。我从其他 *** 问题中了解到,分区和洗牌可能会导致性能问题,因此为了更好地理解这些问题,我在 8 列中手动计算每周的 8 个最近的每周值,然后将这些列添加到后面的 8 周'总和。
这是我创建的简化数据集:
idCount = 2
tCount = 10
df = pd.DataFrame('customerId': [x for x in range(idCount) for y in range(tCount)],
't': [y for x in range(idCount) for y in range(tCount)],
'vals': range(idCount * tCount))[['customerId', 't', 'vals']]
创建此数据框:
输入数据框
customerId t vals
0 0 0 0
1 0 1 1
2 0 2 2
3 0 3 3
4 0 4 4
5 0 5 5
6 0 6 6
7 0 7 7
8 0 8 8
9 0 9 9
10 1 0 10
11 1 1 11
12 1 2 12
13 1 3 13
14 1 4 14
15 1 5 15
16 1 6 16
17 1 7 17
18 1 8 18
19 1 9 19
我的目标输出是 8 周滞后的“vals”列,包括 vals_0 作为本周的值,其中数据不可用的 NaN:
目标输出数据框
customerId t vals_0 vals_1 vals_2 vals_3 vals_4 vals_5 vals_6 vals_7
0 0 0 0 NaN NaN NaN NaN NaN NaN NaN
1 0 1 1 0.0 NaN NaN NaN NaN NaN NaN
2 0 2 2 1.0 0.0 NaN NaN NaN NaN NaN
3 0 3 3 2.0 1.0 0.0 NaN NaN NaN NaN
4 0 4 4 3.0 2.0 1.0 0.0 NaN NaN NaN
5 0 5 5 4.0 3.0 2.0 1.0 0.0 NaN NaN
6 0 6 6 5.0 4.0 3.0 2.0 1.0 0.0 NaN
7 0 7 7 6.0 5.0 4.0 3.0 2.0 1.0 0.0
8 0 8 8 7.0 6.0 5.0 4.0 3.0 2.0 1.0
9 0 9 9 8.0 7.0 6.0 5.0 4.0 3.0 2.0
10 1 0 10 NaN NaN NaN NaN NaN NaN NaN
11 1 1 11 10.0 NaN NaN NaN NaN NaN NaN
12 1 2 12 11.0 10.0 NaN NaN NaN NaN NaN
13 1 3 13 12.0 11.0 10.0 NaN NaN NaN NaN
14 1 4 14 13.0 12.0 11.0 10.0 NaN NaN NaN
15 1 5 15 14.0 13.0 12.0 11.0 10.0 NaN NaN
16 1 6 16 15.0 14.0 13.0 12.0 11.0 10.0 NaN
17 1 7 17 16.0 15.0 14.0 13.0 12.0 11.0 10.0
18 1 8 18 17.0 16.0 15.0 14.0 13.0 12.0 11.0
19 1 9 19 18.0 17.0 16.0 15.0 14.0 13.0 12.0
以下 Pandas 函数创建目标输出数据框:
def get_lag_cols_pandas(df, partCol, timeCol, lagCol, numLags):
newdf = df[[partCol, timeCol, lagCol]]
for x in range(numLags):
newCol = '_'.format(lagCol, x)
joindf = newdf[[partCol, timeCol, lagCol]]
joindf[timeCol] = newdf[timeCol] + x
joindf = joindf.rename(columns = lagCol: newCol)
newdf = newdf.merge(joindf, how = 'left', on = [partCol, timeCol])
return newdf.drop(lagCol, axis = 1)
并在大约 500 毫秒内运行:
>>> %timeit print('pandas result: \n\n\n'.format(get_lag_cols_pandas(df, 'customerId', 't', 'vals', 8)))
1 loop, best of 3: 501 ms per loop
我也可以在 Dask 中使用 map_partitions()
完成此操作,并在大约 900 毫秒内获得相同的结果(由于启动线程的开销,可能比 Pandas 更差):
>>> ddf = dd.from_pandas(df, npartitions = 1)
>>> %timeit print('dask result: \n\n\n'.format(ddf.map_partitions(lambda df: get_lag_cols_pandas(df, \
'customerId', 't', 'vals', 8)).compute(scheduler = 'threads')))
1 loop, best of 3: 893 ms per loop
我也可以在 Pyspark 中完成此操作(注意:对于 Dask 和 Spark,我只有一个分区,以便与 Pandas 进行更公平的比较):
>>> sparkType = SparkSession.builder.master('local[1]')
>>> spark = sparkType.getOrCreate()
>>> sdf = spark.createDataFrame(df)
>>> sdf.show()
+----------+---+----+
|customerId| t|vals|
+----------+---+----+
| 0| 0| 0|
| 0| 1| 1|
| 0| 2| 2|
| 0| 3| 3|
| 0| 4| 4|
| 0| 5| 5|
| 0| 6| 6|
| 0| 7| 7|
| 0| 8| 8|
| 0| 9| 9|
| 1| 0| 10|
| 1| 1| 11|
| 1| 2| 12|
| 1| 3| 13|
| 1| 4| 14|
| 1| 5| 15|
| 1| 6| 16|
| 1| 7| 17|
| 1| 8| 18|
| 1| 9| 19|
+----------+---+----+
>>> sdf.rdd.getNumPartitions()
1
使用以下代码:
def get_lag_cols_spark(df, partCol, timeCol, lagCol, numLags):
newdf = df.select(df[partCol], df[timeCol], df[lagCol])
for x in range(numLags):
newCol = '_'.format(lagCol, x)
joindf = newdf.withColumn('newIdx', newdf[timeCol] + x) \
.drop(timeCol).withColumnRenamed('newIdx', timeCol) \
.withColumnRenamed(lagCol, newCol)
newdf = newdf.join(joindf.select(joindf[partCol], joindf[timeCol], joindf[newCol]), [partCol, timeCol], how = 'left')
newdf = newdf.drop(lagCol)
return newdf
我得到了正确的结果(虽然被打乱了):
+----------+---+------+------+------+------+------+------+------+------+
|customerId| t|vals_0|vals_1|vals_2|vals_3|vals_4|vals_5|vals_6|vals_7|
+----------+---+------+------+------+------+------+------+------+------+
| 1| 3| 13| 12| 11| 10| null| null| null| null|
| 1| 0| 10| null| null| null| null| null| null| null|
| 1| 1| 11| 10| null| null| null| null| null| null|
| 0| 9| 9| 8| 7| 6| 5| 4| 3| 2|
| 0| 1| 1| 0| null| null| null| null| null| null|
| 1| 4| 14| 13| 12| 11| 10| null| null| null|
| 0| 4| 4| 3| 2| 1| 0| null| null| null|
| 0| 3| 3| 2| 1| 0| null| null| null| null|
| 0| 7| 7| 6| 5| 4| 3| 2| 1| 0|
| 1| 5| 15| 14| 13| 12| 11| 10| null| null|
| 1| 6| 16| 15| 14| 13| 12| 11| 10| null|
| 0| 6| 6| 5| 4| 3| 2| 1| 0| null|
| 1| 7| 17| 16| 15| 14| 13| 12| 11| 10|
| 0| 8| 8| 7| 6| 5| 4| 3| 2| 1|
| 0| 0| 0| null| null| null| null| null| null| null|
| 0| 2| 2| 1| 0| null| null| null| null| null|
| 1| 2| 12| 11| 10| null| null| null| null| null|
| 1| 9| 19| 18| 17| 16| 15| 14| 13| 12|
| 0| 5| 5| 4| 3| 2| 1| 0| null| null|
| 1| 8| 18| 17| 16| 15| 14| 13| 12| 11|
+----------+---+------+------+------+------+------+------+------+------+
但 Pyspark 版本需要 明显更长 运行(34 秒):
>>> %timeit get_lag_cols_spark(sdf, 'customerId', 't', 'vals', 8).show()
1 loop, best of 3: 34 s per loop
我让这个示例小而简单(只有 20 个数据行,Dask 和 Spark 都只有 1 个分区),所以我不认为内存和 CPU 使用会导致显着的性能差异。
我的问题是:有什么方法可以更好地配置 Pyspark 或优化 Pyspark 在此特定任务上的执行,以使 Pyspark 在速度方面更接近 Pandas 和 Dask(即 0.5-1.0 秒)?
【问题讨论】:
【参考方案1】:根据定义,pyspark 很慢,因为 Spark 本身是用 Scala 编写的,任何 pyspark 程序都涉及运行至少 1 个 JVM(通常是 1 个驱动程序和多个工作程序)和 python 程序(每个工作程序 1 个)以及它们之间的通信。 java和python端的进程间通信量取决于你使用的python代码。
即使没有所有语言间的喧嚣,Spark 也有很多开销用于处理大数据分布式处理 - 这意味着 Spark 程序往往比任何非分布式解决方案都要慢......只要规模很小。 Spark 和 pyspark 是专门为大规模构建的,这就是它的亮点
【讨论】:
以上是关于优化 Pyspark 性能以匹配 Pandas / Dask?的主要内容,如果未能解决你的问题,请参考以下文章
为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?