当 python 函数比它们快时,为啥我们使用 pyspark UDF? (注。不用担心 spark SQL 命令)

Posted

技术标签:

【中文标题】当 python 函数比它们快时,为啥我们使用 pyspark UDF? (注。不用担心 spark SQL 命令)【英文标题】:Why do we use pyspark UDF when python functions are faster than them? (Note. Not worrying about spark SQL commands)当 python 函数比它们快时,为什么我们使用 pyspark UDF? (注。不用担心 spark SQL 命令) 【发布时间】:2020-10-05 11:10:13 【问题描述】:

我有一个数据框:

df = (spark
  .range(0, 10 * 1000 * 1000)\
  .withColumn('id', (col('id') / 1000).cast('integer'))\
  .withColumn('v', rand()))

输出:

+---+-------------------+
| id|                  v|
+---+-------------------+
|  0|0.05011803459635367|
|  0| 0.6749337782428327|
|  0| 0.9449105904567048|
|  0| 0.9183605955607251|
|  0|  0.648596393346793|
+---+-------------------+

现在,可以通过 SQL 函数和 UDF 完成一个简单的 - 将 1 添加到“v”。

如果我们忽略 SQL(最佳性能)

我们可以创建一个 UDF:

@udf("double")
def plus_one(v):
    return v + 1

然后调用它:

df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

时间:16.5 秒

但这是我的问题:

如果我不要使用 udf 并直接写:

def plus_one(v):
        return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

所用时间 - 352 毫秒

简而言之,UDF 查询耗时约 16 秒,而普通 python 函数耗时约 350 毫秒

比较,

df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()

时间:347 毫秒

这是我的困境:

如果我可以使用与内置函数相比的普通 python 函数执行相同的场景...

问。为什么不直接使用python函数呢?

问。仅当我们计划在 SQL 中像命令一样使用它时,注册 UDF 才有意义吗?

我们不这样做肯定有一些优化原因......或者可能与 spark 集群的工作原理有关?

[ 已经回答了 2 个问题,但是这两个问题都以“首选 SQL 内置函数...”结尾 我正在将 python 函数与 UDF 进行比较,它在 pyspark 应用程序中的可行性。 ]

编辑: 我也用 pandas_udf 做到了这一点:

@pandas_udf('double')
def vectorized_plus_one(v):
    return v + 1

df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()

时间:5.26 秒

我附上了截图:

The output for Adding 1 to value - Python funtion (standalone), UDF, SQL

【问题讨论】:

【参考方案1】:

您的方案有效,因为实际上您并没有在 python 中添加 1,它是在 Java 中添加的,其添加方式与使用 SQL 时使用的方式非常相似。

让我们把案例分开:

    你做plus_one(df.v)等于只是通过df.v + 1 尝试在您最喜欢的 repl 中键入 df.v + 1,您会看到它返回 Column 类型的对象。 怎么可能? Column 类覆盖了 __radd__ 魔术方法(以及其他一些方法)并返回新的 Column 实例,其中包含将指定列加 1 的指令。

总而言之:withColumn 始终接受 Column 类型的对象作为第二个参数,而将 1 加到列中的技巧就是 python 的魔力。

这就是为什么它比udfvectorized udf 工作得更快的原因:它们需要运行python 进程,序列化/反序列化数据(矢量化udfs 可以更快地与arrow 一起工作以避免序列化/反序列化),在较慢的python 进程中计算.

【讨论】:

那么,从长远来看,如果我链接一些复杂的逻辑,python 函数按原样肯定会失败?不过谢谢,我可以理解一个 python 函数如何导致“列不可迭代”错误。 是的,如果您执行不返回新 Column 对象的操作,它会。一般来说,只有来自pyspark.sql.functions 的简单算术运算和函数才能做到这一点。对于您需要使用 udfs 的所有其他内容。

以上是关于当 python 函数比它们快时,为啥我们使用 pyspark UDF? (注。不用担心 spark SQL 命令)的主要内容,如果未能解决你的问题,请参考以下文章

为啥我系统上的所有 CuPy 函数都比它们的 NumPy 对应函数慢?

为啥我们需要python中的运算符函数?

为啥 cffi 比 numpy 快这么多?

(Python) 马尔科夫、切比雪夫、切尔诺夫上限函数

lambda 比 python 中的函数调用慢,为啥

为啥 PLSQL 比 SQL*Plus 慢