在 pyspark 中使用 pandas_udf 过滤数据框

Posted

技术标签:

【中文标题】在 pyspark 中使用 pandas_udf 过滤数据框【英文标题】:filtering a dataframe using pandas_udf in pyspark 【发布时间】:2019-05-09 10:37:05 【问题描述】:

我有一个具有以下架构的 spark 数据框:

root
 |-- idvalue: string (nullable = true)
 |-- locationaccuracyhorizontal: float (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- is_weekend: boolean (nullable = true)
 |-- locationlatrad: float (nullable = true)
 |-- locationlonrad: float (nullable = true)
 |-- epochtimestamp: integer (nullable = true)
 |-- velocity: float (nullable = true)

相关列的示例数据如下所示:

+--------------------+--------------------------+----+---+--------------+-----------+
|             idvalue|locationaccuracyhorizontal|hour|day|epochtimestamp|   velocity|
+--------------------+--------------------------+----+---+--------------+-----------+
|000xxxxx-yyyy-zzz...|                      32.0|  23|  9|    1554853730|       null|
|000xxxxx-yyyy-zzz...|                     165.0|   0| 10|    1554854501|   0.635121|
|000xxxxx-yyyy-zzz...|                      65.0|   0| 10|    1554854814| 0.96369237|
|000xxxxx-yyyy-zzz...|                     165.0|   0| 10|    1554855465|  0.3710725|
|000xxxxx-yyyy-zzz...|                    2000.0|   0| 10|    1554857260|   2.383398|
|000xxxxx-yyyy-zzz...|                    3000.0|   0| 10|    1554857625|  26.000359|
|000xxxxx-yyyy-zzz...|                      96.0|   0| 10|    1554857919|  30.961931|
|                                        ...                                        |
|000xxxxx-yyyy-zzz...|                      32.0|  10| 11|    1554977822|   55.37194|
+--------------------+--------------------------+----+---+--------------+-----------+

我想在使用 pandas_udfidvalue 分组后执行过滤操作。我尝试了以下方法:

@pandas_udf(df1.schema, PandasUDFType.GROUPED_MAP)
def filter_data(pdf): 
    idvalue = pdf.idvalue
    hour = pdf.hour        
    return pdf.query('hour > @MIN_NIGHT_HOUR AND hour < @MAX_NIGHT_HOUR')

df2 = df1.groupBy('idvalue') \
        .apply(filter_data).show() 

但它显示以下错误:

An error occurred while calling o2397.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 74.0 failed 4 times, most recent failure: Lost task 0.3 in stage 74.0 (TID 421, ip-10-0-3-239.eu-west-1.compute.internal, executor 29): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/serializers.py", line 284, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/serializers.py", line 253, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/serializers.py", line 253, in <listcomp>
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/serializers.py", line 251, in create_array
    return pa.Array.from_pandas(s, mask=mask, type=t)
  File "pyarrow/array.pxi", line 542, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 78, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Integer value out of bounds

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:156)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
    at sun.reflect.GeneratedMethodAccessor338.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

pandas_udf 文档告诉我们

grouped map UDF 定义转换:A pandas.DataFrame -> A pandas.DataFrame

pdf.query 的输出也是一个数据框,所以我很困惑这个错误背后的原因是什么。我也尝试过执行其他过滤查询,但无济于事。

【问题讨论】:

我希望你注意到跟踪中的这个错误,“pyarrow.lib.ArrowInvalid: Integer value out of bounds”。如果您发布示例数据,火花代码也可以解决原因。 @RangaVure 是的,我查看了那个痕迹,但找不到它出现的原因。我已经用示例数据和调用 spark 代码 sn-p 更新了问题.. 你的代码对我不起作用,我使用本地环境运行。然后我在 filter_data 中定义了 MIN_NIGHT_HOUR = 0 MAX_NIGHT_HOUR = 24 然后我得到了结果。 @RangaVure 哦!那你没有得到错误吗?你创建了多少个虚拟行? 在 filter_data 函数中添加 MIN_NIGHT_HOUR = 0 MAX_NIGHT_HOUR = 24 不会导致任何错误(否则会抛出错误 MIN_NIGHT_HOUR 未定义)。我使用了 7 行您的示例数据。 【参考方案1】:

我可以重现您的问题,epochtimestamp: integer 但数据很长。

如果您将 epochtimestamp 数据类型更改为 Long,它将起作用。

以下代码导致同样的错误

schema = StructType(
    [StructField("idvalue", StringType(), True),
     StructField("hour", LongType(), True),
     StructField("epochtimestamp", IntegerType(), True)]
)

df1 = spark.createDataFrame(
    [('000xxxxx-yyyy-zzz',23,155485373044444),
    ('000xxxxx-yyyy-zzz',0,1554854501),
    ('000xxxxx-yyyy-zzz',0, 1554854814),
    ('000xxxxx-yyyy-zzz',0, 1554855465),
    ('000xxxxx-yyyy-zzz',0, 1554857260),
    ('000xxxxx-yyyy-zzz2',0,1554857625),
    ('000xxxxx-yyyy-zzz1',0, 155485791922)],
    ['idvalue','hour','epochtimestamp'],schema
)

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def filter_data(pdf):
    MIN_NIGHT_HOUR = 0
    MAX_NIGHT_HOUR = 24
    idvalue = pdf.idvalue
    hour = pdf.hour
    return pdf.query('hour > @MIN_NIGHT_HOUR & hour < @MAX_NIGHT_HOUR')

df1.groupBy(
    'idvalue'
).apply(
    filter_data
).show()

【讨论】:

我实际上已将所有数字字段转换为 long,但仍然显示相同的错误。每当我有时间时,我都会尝试将数据帧写入 csv 并查看其中是否有任何不良数据。时间戳很好地落在整数范围内。所以,我指望一些不好的数据溜进来。 实际错误略有不同,但大致相同。我发现一些值是inf。删除这些行后,它按预期工作。

以上是关于在 pyspark 中使用 pandas_udf 过滤数据框的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中使用 pandas_udf 时无法填充数组

在 pyspark 中使用 pandas_udf 过滤数据框

为啥运行 pandas_udf 时 Pyspark 失败?

PySpark中pandas_udf的隐式模式?

PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列

pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致