我可以让 Spark 仅在必要的行上运行 UDF 吗?

Posted

技术标签:

【中文标题】我可以让 Spark 仅在必要的行上运行 UDF 吗?【英文标题】:Can I have Spark run UDF only on necessary rows? 【发布时间】:2019-05-17 20:34:35 【问题描述】:

我刚刚开始使用 pyspark,无法让我的 UDF 仅在必要的行上运行。相反,它在所有行上运行。

我确实在相关帖子中找到了一些信息:Is Spark only applying my UDF on records being shown?。至少在 Scala 中,这似乎是可能的。

显示我遇到的问题的简短示例:

from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

def timesTwo(v):
    print("v*2:", v, v*2)
    return v*2

df = spark.range(100).toDF('value')
df = df.withColumn('v2', udf(timesTwo, IntegerType())(col('value')))
df.take(10)

我希望打印语句只针对value=0value=9 执行。但是,根据我的控制台输出,它针对数据帧中的所有行运行(最多 value=99)。

v*2: 0 0
v*2: 1 2
v*2: 2 4
v*2: 3 6
v*2: 4 8
v*2: 5 10
v*2: 6 12
v*2: 7 14
v*2: 8 16
v*2: 9 18
v*2: 10 20
v*2: 11 22
v*2: 12 24
v*2: 13 26
v*2: 14 28
v*2: 15 30
v*2: 16 32
v*2: 17 34
v*2: 18 36
v*2: 19 38
v*2: 20 40
v*2: 21 42
v*2: 22 44
v*2: 23 46
v*2: 24 48
v*2: 25 50
v*2: 26 52
v*2: 27 54
v*2: 28 56
v*2: 29 58
v*2: 30 60
v*2: 31 62
v*2: 32 64
v*2: 33 66
v*2: 34 68
v*2: 35 70
v*2: 36 72
v*2: 37 74
v*2: 38 76
v*2: 39 78
v*2: 40 80
v*2: 41 82
v*2: 42 84
v*2: 43 86
v*2: 44 88
v*2: 45 90
v*2: 46 92
v*2: 47 94
v*2: 48 96
v*2: 49 98
v*2: 50 100
v*2: 51 102
v*2: 52 104
v*2: 53 106
v*2: 54 108
v*2: 55 110
v*2: 56 112
v*2: 57 114
v*2: 58 116
v*2: 59 118
v*2: 60 120
v*2: 61 122
v*2: 62 124
v*2: 63 126
v*2: 64 128
v*2: 65 130
v*2: 66 132
v*2: 67 134
v*2: 68 136
v*2: 69 138
v*2: 70 140
v*2: 71 142
v*2: 72 144
v*2: 73 146
v*2: 74 148
v*2: 75 150
v*2: 76 152
v*2: 77 154
v*2: 78 156
v*2: 79 158
v*2: 80 160
v*2: 81 162
v*2: 82 164
v*2: 83 166
v*2: 84 168
v*2: 85 170
v*2: 86 172
v*2: 87 174
v*2: 88 176
v*2: 89 178
v*2: 90 180
v*2: 91 182
v*2: 92 184
v*2: 93 186
v*2: 94 188
v*2: 95 190
v*2: 96 192
v*2: 97 194
v*2: 98 196
v*2: 99 198
[Row(value=0, v2=0),
 Row(value=1, v2=2),
 Row(value=2, v2=4),
 Row(value=3, v2=6),
 Row(value=4, v2=8),
 Row(value=5, v2=10),
 Row(value=6, v2=12),
 Row(value=7, v2=14),
 Row(value=8, v2=16),
 Row(value=9, v2=18)]

【问题讨论】:

你应该在take(10)之后应用你的函数: df = df.take(10).withColumn('v2', udf(timesTwo, IntegerType())(col('value')) ) 【参考方案1】:

有两种执行方式,取决于你的目的,看下面的例子:

from pyspark.shell import spark
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

times_two = udf(lambda v: v * 2, IntegerType())

df = spark.range(100).toDF('value')

# Creates a new Dataframe based on condition `value < 10` and apply function.
where_df = df.where(df.value < 10).withColumn('v2', times_two(df.value))

# Creates a new Dataframe with the first 10 values and apply udf function
take_df = df.take(10).withColumn('v2', times_two(df.value))

输出数据框:

+-----+---+
|value| v2|
+-----+---+
|    0|  0|
|    1|  2|
|    2|  4|
|    3|  6|
|    4|  8|
|    5| 10|
|    6| 12|
|    7| 14|
|    8| 16|
|    9| 18|
+-----+---+

【讨论】:

以上是关于我可以让 Spark 仅在必要的行上运行 UDF 吗?的主要内容,如果未能解决你的问题,请参考以下文章

如何让每个 customLabel 在自己的行上?

统计 Spark 中 UDF 的调用次数

Apache Spark - UDF 似乎不适用于 spark-submit

独立运行 UDF 的 Spark 错误

Spark UDF 没有正确给出滚动计数

Scala Spark 中的 udf 运行时错误