在 pyspark 中使用 pandas_udf 过滤数据框
Posted
技术标签:
【中文标题】在 pyspark 中使用 pandas_udf 过滤数据框【英文标题】:filtering a dataframe using pandas_udf in pyspark 【发布时间】:2019-05-09 10:37:05 【问题描述】:我有一个具有以下架构的 spark 数据框:
root
|-- idvalue: string (nullable = true)
|-- locationaccuracyhorizontal: float (nullable = true)
|-- hour: integer (nullable = true)
|-- day: integer (nullable = true)
|-- is_weekend: boolean (nullable = true)
|-- locationlatrad: float (nullable = true)
|-- locationlonrad: float (nullable = true)
|-- epochtimestamp: integer (nullable = true)
|-- velocity: float (nullable = true)
相关列的示例数据如下所示:
+--------------------+--------------------------+----+---+--------------+-----------+
| idvalue|locationaccuracyhorizontal|hour|day|epochtimestamp| velocity|
+--------------------+--------------------------+----+---+--------------+-----------+
|000xxxxx-yyyy-zzz...| 32.0| 23| 9| 1554853730| null|
|000xxxxx-yyyy-zzz...| 165.0| 0| 10| 1554854501| 0.635121|
|000xxxxx-yyyy-zzz...| 65.0| 0| 10| 1554854814| 0.96369237|
|000xxxxx-yyyy-zzz...| 165.0| 0| 10| 1554855465| 0.3710725|
|000xxxxx-yyyy-zzz...| 2000.0| 0| 10| 1554857260| 2.383398|
|000xxxxx-yyyy-zzz...| 3000.0| 0| 10| 1554857625| 26.000359|
|000xxxxx-yyyy-zzz...| 96.0| 0| 10| 1554857919| 30.961931|
| ... |
|000xxxxx-yyyy-zzz...| 32.0| 10| 11| 1554977822| 55.37194|
+--------------------+--------------------------+----+---+--------------+-----------+
我想在使用 pandas_udf 按idvalue
分组后执行过滤操作。我尝试了以下方法:
@pandas_udf(df1.schema, PandasUDFType.GROUPED_MAP)
def filter_data(pdf):
idvalue = pdf.idvalue
hour = pdf.hour
return pdf.query('hour > @MIN_NIGHT_HOUR AND hour < @MAX_NIGHT_HOUR')
df2 = df1.groupBy('idvalue') \
.apply(filter_data).show()
但它显示以下错误:
An error occurred while calling o2397.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 74.0 failed 4 times, most recent failure: Lost task 0.3 in stage 74.0 (TID 421, ip-10-0-3-239.eu-west-1.compute.internal, executor 29): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/worker.py", line 372, in main
process()
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/worker.py", line 367, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/serializers.py", line 284, in dump_stream
batch = _create_batch(series, self._timezone)
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/serializers.py", line 253, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/serializers.py", line 253, in <listcomp>
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000048/pyspark.zip/pyspark/serializers.py", line 251, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
File "pyarrow/array.pxi", line 542, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
File "pyarrow/array.pxi", line 78, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Integer value out of bounds
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:156)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
at sun.reflect.GeneratedMethodAccessor338.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
pandas_udf 文档告诉我们
grouped map UDF 定义转换:A pandas.DataFrame -> A pandas.DataFrame
pdf.query
的输出也是一个数据框,所以我很困惑这个错误背后的原因是什么。我也尝试过执行其他过滤查询,但无济于事。
【问题讨论】:
我希望你注意到跟踪中的这个错误,“pyarrow.lib.ArrowInvalid: Integer value out of bounds”。如果您发布示例数据,火花代码也可以解决原因。 @RangaVure 是的,我查看了那个痕迹,但找不到它出现的原因。我已经用示例数据和调用 spark 代码 sn-p 更新了问题.. 你的代码对我不起作用,我使用本地环境运行。然后我在 filter_data 中定义了 MIN_NIGHT_HOUR = 0 MAX_NIGHT_HOUR = 24 然后我得到了结果。 @RangaVure 哦!那你没有得到错误吗?你创建了多少个虚拟行? 在 filter_data 函数中添加 MIN_NIGHT_HOUR = 0 MAX_NIGHT_HOUR = 24 不会导致任何错误(否则会抛出错误 MIN_NIGHT_HOUR 未定义)。我使用了 7 行您的示例数据。 【参考方案1】:我可以重现您的问题,epochtimestamp: integer 但数据很长。
如果您将 epochtimestamp 数据类型更改为 Long
,它将起作用。
以下代码导致同样的错误
schema = StructType(
[StructField("idvalue", StringType(), True),
StructField("hour", LongType(), True),
StructField("epochtimestamp", IntegerType(), True)]
)
df1 = spark.createDataFrame(
[('000xxxxx-yyyy-zzz',23,155485373044444),
('000xxxxx-yyyy-zzz',0,1554854501),
('000xxxxx-yyyy-zzz',0, 1554854814),
('000xxxxx-yyyy-zzz',0, 1554855465),
('000xxxxx-yyyy-zzz',0, 1554857260),
('000xxxxx-yyyy-zzz2',0,1554857625),
('000xxxxx-yyyy-zzz1',0, 155485791922)],
['idvalue','hour','epochtimestamp'],schema
)
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def filter_data(pdf):
MIN_NIGHT_HOUR = 0
MAX_NIGHT_HOUR = 24
idvalue = pdf.idvalue
hour = pdf.hour
return pdf.query('hour > @MIN_NIGHT_HOUR & hour < @MAX_NIGHT_HOUR')
df1.groupBy(
'idvalue'
).apply(
filter_data
).show()
【讨论】:
我实际上已将所有数字字段转换为 long,但仍然显示相同的错误。每当我有时间时,我都会尝试将数据帧写入 csv 并查看其中是否有任何不良数据。时间戳很好地落在整数范围内。所以,我指望一些不好的数据溜进来。 实际错误略有不同,但大致相同。我发现一些值是inf
。删除这些行后,它按预期工作。以上是关于在 pyspark 中使用 pandas_udf 过滤数据框的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中使用 pandas_udf 时无法填充数组
在 pyspark 中使用 pandas_udf 过滤数据框