pyspark 数据框 UDF 异常处理

Posted

技术标签:

【中文标题】pyspark 数据框 UDF 异常处理【英文标题】:pyspark dataframe UDF exception handling 【发布时间】:2018-05-06 17:21:24 【问题描述】:

我已经使用 python 编写了一个用于 spark 的 UDF。这个函数取 一个日期(在字符串中,例如“2017-01-06”)和 一组字符串(例如:[2017-01-26, 2017-02-26, 2017-04-17]) 并返回自上次最近日期以来的#days。 UDF 是

def findClosestPreviousDate(currdate, date_list):
    date_format = "%Y-%m-%d"
    currdate = datetime.datetime.strptime(currdate, date_format)
    result = currdate
    date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
    lowestdiff = 10000
    for dt in date_list:
        if(dt >= currdate):
            continue
        delta = currdate-dt
        diff = delta.days
        if(diff < lowestdiff):
            lowestdiff = diff
            result = dt
    dlt = currdate-result
    return dlt.days


findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())

我在下面这样称呼它

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(size(col("activity_arr")) > 0, findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))

即使我删除了“activity_arr”列中的所有空值,我仍然会收到此 NoneType 错误。也尝试在函数内部应用异常处理(仍然相同)。

我们有没有更好的方法在运行时从 UDF 捕获错误记录(可能使用累加器,我看到很少有人尝试使用 scala)

错误:

----------------------------------- ---------------------------- Py4JJavaError Traceback(最近调用 最后)在() ----> 1 grouped_extend_df2.show()

/usr/lib/spark/python/pyspark/sql/dataframe.pyc in show(self, n, 截短) 第334章 335 如果 isinstance(truncate, bool) 并截断: --> 336 打印(self._jdf.showString(n, 20)) 337 其他: 第338章 打印(self._jdf.showString(n,int(截断)))

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py 在 调用(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_args in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw) 61 def deco(*a, **kw): 62 尝试: ---> 63 返回 f(*a, **kw) 64 除了 py4j.protocol.Py4JJavaError 作为 e: 65 秒 = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py 在 get_return_value(answer, gateway_client, target_id, name) 第317章 318 “调用 012 时出错。\n”。 --> 319 格式(target_id,“.”,名称),值) 320 其他: 第321章

Py4JJavaError:调用 o1111.showString 时出错。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 315.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0 在阶段 315.0 (TID 18390, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (最近 最后调用):文件 “/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 177 行, 主要是 process() 文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 172 行, 进行中 serializer.dump_stream(func(split_index, iterator), outfile) 文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,行 104,在 func = lambda _, it: map(mapper, it) File "", line 1, in File “/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 71 行,在 return lambda *a: f(*a) File "", line 5, in findClosestPreviousDate TypeError: 'NoneType' object is not 可迭代

在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 在 org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234) 在 org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 在 org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) 在 org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在 org.apache.spark.scheduler.Task.run(Task.scala:108) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) 在 scala.Option.foreach(Option.scala:257) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2050) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2069) 在 org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336) 在 org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861) 在 org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150) 在 org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150) 在 org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) 在 org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841) 在 org.apache.spark.sql.Dataset.head(Dataset.scala:2150) 在 org.apache.spark.sql.Dataset.take(Dataset.scala:2363) 在 org.apache.spark.sql.Dataset.showString(Dataset.scala:241) 在 sun.reflect.GeneratedMethodAccessor237.invoke(未知来源)在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:280) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:214) 在 java.lang.Thread.run(Thread.java:748) 原因: org.apache.spark.api.python.PythonException:回溯(最近 最后调用):文件 “/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 177 行, 主要是 process() 文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 172 行, 进行中 serializer.dump_stream(func(split_index, iterator), outfile) 文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,行 104,在 func = lambda _, it: map(mapper, it) File "", line 1, in File “/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 71 行,在 return lambda *a: f(*a) File "", line 5, in findClosestPreviousDate TypeError: 'NoneType' object is not 可迭代

在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 在 org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234) 在 org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 在 org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) 在 org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在 org.apache.spark.scheduler.Task.run(Task.scala:108) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 更多

【问题讨论】:

【参考方案1】:

我试过你的 udf,但它总是返回 0(int)。

dlt = currdate-result # result and currdate are same
return dlt.days # days is int type

但是在创建 udf 时你已经指定了 StringType。

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())

因此我修改了findClosestPreviousDate函数,如有需要请修改。

>>> in_dates = ['2017-01-26', '2017-02-26', '2017-04-17']
>>>
>>> def findClosestPreviousDate(currdate, date_list=in_dates):
...     date_format = "%Y-%m-%d"
...     currdate = datetime.datetime.strptime(currdate, date_format)
...     date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
...     diff = map(lambda dt: (currdate - dt).days, date_list)
...     closestDate = min(filter(lambda days_diff: days_diff <= 0, diff))
...     return closestDate if closestDate else 0
...
>>> findClosestPreviousDate('2017-01-06')
-101

还将udf的返回类型设为IntegerType。通过这些修改,代码可以工作,但请验证更改是否正确。 PySpark udfs 只能接受单个参数,有一个变通方法,参考PySpark - Pass list as parameter to UDF

>>> df.show()
+----------+
|      date|
+----------+
|2017-01-06|
|2017-01-08|
+----------+

>>>
>>> in_dates = ['2017-01-26', '2017-02-26', '2017-04-17']
>>> def findClosestPreviousDate(currdate, date_list=in_dates):
...     date_format = "%Y-%m-%d"
...     currdate = datetime.datetime.strptime(currdate, date_format)
...     date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
...     diff = map(lambda dt: (currdate - dt).days, date_list)
...     closestDate = min(filter(lambda days_diff: days_diff <= 0, diff))
...     return closestDate if closestDate else 0
...
>>> findClosestPreviousDate('2017-01-06')
-101
>>>
>>> from pyspark.sql.types import IntegerType
>>> findClosestPreviousDateUDF = udf(findClosestPreviousDate, IntegerType())
>>> df.withColumn('closest_date', findClosestPreviousDateUDF(df['date'])).show()
+----------+------------+
|      date|closest_date|
+----------+------------+
|2017-01-06|        -101|
|2017-01-08|         -99|
+----------+------------+

希望这会有所帮助!

【讨论】:

感谢您的帮助。仅当 currdate > 数组中的任何值(这是要求)时,udf 才会返回值。我有 stringType 作为返回,因为我想将 NoneType 转换为 NA 如果有的话(目前,即使没有空值,它仍然会抛出 NoneType 错误,这是我正在尝试修复的)。 “pyspark 只能接受单个参数”,你的意思是它不能接受列表还是你的意思是它不能接受多个参数 另外,我想检查一下,你知道如何在 pyspark 中使用累加器来识别在 UDF 的运行时调用期间哪些记录失败。 "pyspark can only accept single arguments" 意味着您只能将数据框的列作为函数的输入传递,因此它使您的 udf 工作使用默认参数并在其中传递日期。不要使用蓄能器。它在 Spark 中是反模式并且已被弃用。参考:spark.apache.org/docs/2.1.1/api/java/deprecated-list.html 尝试异常处理并在 catch 中分配一些返回值,例如“ERROR”,以查找错误记录。 The udf will return values only if currdate &gt; any of the values in the array(it is the requirement). 但 udf 必须为每条记录返回一些值。返回诸如“ERROR”或“NA”之类的值,然后过滤那些具有此跳过词作为结果的记录。我想这就是为什么它说NoneType无法转换为StringType 意味着您只能将数据框的列作为函数的输入传递。我只传递列,它在参数方面工作正常。【参考方案2】:

我想我找到了问题所在。这是我修改后的 UDF。

def findClosestPreviousDate(currdate, date_str):
    date_format = "%Y-%m-%d"
    currdate = datetime.datetime.strptime(currdate, date_format)
    date_list = ''
    result = currdate
    if date_str is None:
        return date_str
    else:
        date_list = date_str.split('|')
    date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
    lowestdiff = 10000
    for dt in date_list:
        if(dt >= currdate):
            continue
        delta = currdate-dt
        diff = delta.days
        if(diff < lowestdiff):
            lowestdiff = diff
            result = dt
    dlt = currdate-result
    return dlt.days

NoneType 错误是由于我知道空值作为参数进入 UDF。奇怪的是为什么我使用 isNotNull() 函数时没有过滤掉空值。

都试过了

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(size(col("activity_arr")) > 0, findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(col("activity_arr").isNotNull(), findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))

但是,当我在上面的函数 findClosestPreviousDate() 中将 NoneType 交给函数时,如下所示

if date_str is None:
    return date_str
else:
    date_list = date_str.split('|')

成功了。

【讨论】:

以上是关于pyspark 数据框 UDF 异常处理的主要内容,如果未能解决你的问题,请参考以下文章

数据框空值在 UDF 之后转换为 0。为啥?

Pandas UDF 函数在大数据上完成的时间异常长

使用 pyspark 在数据块中实现 FileNotFound 异常

pyspark:将多个数据框字段传递给 udf

pyspark 在 udf 中使用数据框

哪个选项使用 pyspark 提供最佳性能?使用地图进行 UDF 或 RDD 处理?