Spark pandas_udf 并不快
Posted
技术标签:
【中文标题】Spark pandas_udf 并不快【英文标题】:Spark pandas_udf is not faster 【发布时间】:2019-07-08 08:23:33 【问题描述】:我面临着繁重的数据转换。简而言之,我有一列数据,每列都包含对应于一些序数的字符串。例如,HIGH
、MID
和 LOW
。我的目标是将这些字符串映射到将保留顺序的整数。在这种情况下,LOW -> 0
、MID -> 1
和 HIGH -> 2
。
这是一个生成此类数据的简单函数:
def fresh_df(N=100000, seed=None):
np.random.seed(seed)
feat1 = np.random.choice(["HI", "LO", "MID"], size=N)
feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=N)
pdf = pd.DataFrame(
"feat1": feat1,
"feat2": feat2
)
return spark.createDataFrame(pdf)
我的第一个方法是:
feat1_dict = "HI": 1, "MID": 2, "LO": 3
feat2_dict = "SMALL": 0, "MEDIUM": 1, "LARGE": 2
mappings =
"feat1": F.create_map([F.lit(x) for x in chain(*feat1_dict.items())]),
"feat2": F.create_map([F.lit(x) for x in chain(*feat2_dict.items())])
for col in df.columns:
col_map = mappings[col]
df = df.withColumn(col+"_mapped", col_map[df[col]])
这按预期工作,但实际上它变得很慢,我想优化这个过程。我读到了pandas_udf
,它给了我希望。这是修改后的代码:
feats_dict =
"feat1": feat1_dict,
"feat2": feat2_dict
for col_name in df.columns:
@F.pandas_udf('integer', F.PandasUDFType.SCALAR)
def map_map(col):
return col.map(feats_dict[col_name])
df = df.withColumn(col_name + "_mapped", map_map(df[col_name]))
唉!比较这两个版本时,执行时间没有任何改善。我在 Spark 的本地实例(使用 docker)和 5 节点 EMR 集群(使用默认配置)上比较了两者。
我创建了一个notebook,您可以在其中查看所有代码。一般来说,我使用了以下导入:
import numpy as np
import pandas as pd
from itertools import chain
from pyspark.sql import functions as F
我错过了什么?为什么这个过程这么慢,为什么使用pandas_udf
时没有任何改善?
【问题讨论】:
【参考方案1】:为什么这么慢?因为 Spark 在 JVM 中运行,而 pyspark
没有(因为它是一个 python 进程),并且为了使该进程成为可能,需要将所有数据序列化和反序列化移动到 JVM。
您可以使用when
和otherwise
函数映射值,避免序列化和反序列化过程,提高性能。
import numpy as np
import pandas as pd
import pyspark.sql.functions as f
from pyspark.shell import spark
def fresh_df(n=100000, seed=None):
np.random.seed(seed)
feat1 = np.random.choice(["HI", "LO", "MID"], size=n)
feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=n)
pdf = pd.DataFrame(
"feat1": feat1,
"feat2": feat2
)
return spark.createDataFrame(pdf)
df = fresh_df()
df = df.withColumn('feat1_mapped', f
.when(df.feat1 == f.lit('HI'), 1)
.otherwise(f.when(df.feat1 == f.lit('MID'), 2).otherwise(3)))
df = df.withColumn('feat2_mapped', f
.when(df.feat2 == f.lit('SMALL'), 0)
.otherwise(f.when(df.feat2 == f.lit('MEDIUM'), 1).otherwise(2)))
df.show(n=20)
输出
+-----+------+------------+------------+
|feat1| feat2|feat1_mapped|feat2_mapped|
+-----+------+------------+------------+
| LO| SMALL| 3| 0|
| LO|MEDIUM| 3| 1|
| MID|MEDIUM| 2| 1|
| MID| SMALL| 2| 0|
| MID| LARGE| 2| 2|
| MID| SMALL| 2| 0|
| LO| SMALL| 3| 0|
| MID| LARGE| 2| 2|
| MID| LARGE| 2| 2|
| MID| SMALL| 2| 0|
| MID|MEDIUM| 2| 1|
| LO| LARGE| 3| 2|
| HI|MEDIUM| 1| 1|
| LO| SMALL| 3| 0|
| HI|MEDIUM| 1| 1|
| MID| SMALL| 2| 0|
| MID|MEDIUM| 2| 1|
| HI| SMALL| 1| 0|
| HI| LARGE| 1| 2|
| MID| LARGE| 2| 2|
+-----+------+------------+------------+
【讨论】:
感谢您的回答!但是,这在我的情况下不起作用。我有 100 个特征,每个特征都有 10 个唯一值。我已经准备好地图,我必须使用这些地图(以 JSON 格式给出,其中键是功能名称,值是另一个带有映射的 JSON)。所以,你是说我必须将我的代码移植到 Java/Scala? 是的,强烈建议(如果可能的话)在 Java 或 Scala 中实现 UDF,以免失去性能 @Dror 你解决了吗?我也有类似的挑战。我正在尝试按客户、订单和返回行进行分组,其中我的 800 个产品关键字是列,值是这些产品的价格(非常稀疏)。我也无法与 pandas_udf 并行化。 是的,但使用的是完全不同的方法。 @Dror 您介意分享一下这种不同的方法吗?以上是关于Spark pandas_udf 并不快的主要内容,如果未能解决你的问题,请参考以下文章
使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas
在 pyspark 中使用 pandas_udf 过滤数据框
PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列