从具有 DenseVector 行的 pyspark 数据帧中获取行的最大值

Posted

技术标签:

【中文标题】从具有 DenseVector 行的 pyspark 数据帧中获取行的最大值【英文标题】:Getting the maximum of a row from a pyspark dataframe with DenseVector rows 【发布时间】:2018-08-20 04:47:58 【问题描述】:

我有一个以 DenseVectors 作为行的数据框:

df = spark.createDataFrame([(Vectors.dense([1,2,3]),),(Vectors.dense([3,4,5]),),(Vectors.dense([6,2,5]),)], ["a"])

我想用 UDF 找到每一行的最大值。这就是我所做的:

findmax = F.udf(lambda x: max(x),DoubleType())
df_out = df_out.select('*',findmax(df_out['sensor_data']).alias('MAX'))

运行代码后,这是我得到的消息

Traceback(最近一次调用最后一次):

文件“”,第 1 行,在 df.select('*',findmax(df['a'])).show()

文件 "C:\ProgramData\Anaconda3\envs\python2\lib\site-packages\pyspark\sql\dataframe.py", 第 336 行,显示中 打印(self._jdf.showString(n, 20))

文件 "C:\ProgramData\Anaconda3\envs\python2\lib\site-packages\py4j\java_gateway.py", 第 1133 行,在 调用 答案,self.gateway_client,self.target_id,self.name)

文件 "C:\ProgramData\Anaconda3\envs\python2\lib\site-packages\pyspark\sql\utils.py", 第 63 行,在装饰中 返回 f(*a, **kw)

文件 "C:\ProgramData\Anaconda3\envs\python2\lib\site-packages\py4j\protocol.py", 第 319 行,在 get_return_value 中 格式(target_id,“.”,名称),值)

Py4JJavaError:调用 o785.showString 时出错。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 67.0 中的任务 2 失败 1 次,最近一次失败:丢失任务 2.0 在阶段 67.0(TID 890,本地主机,执行程序驱动程序):net.razorvine.pickle.PickleException:预期零参数 ClassDict (用于 numpy.dtype)的构造 net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) 在 net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) 在 net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) 在 net.razorvine.pickle.Unpickler.load(Unpickler.java:99) 在 net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) 在 org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:156) 在 org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:155) 在 scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 在 scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 在 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知 来源)在 org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 在 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在 org.apache.spark.scheduler.Task.run(Task.scala:108) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(未知来源) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(未知来源) 在 java.lang.Thread.run(Unknown Source)

驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) 在 scala.Option.foreach(Option.scala:257) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2050) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2069) 在 org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336) 在 org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861) 在 org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150) 在 org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150) 在 org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) 在 org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841) 在 org.apache.spark.sql.Dataset.head(Dataset.scala:2150) 在 org.apache.spark.sql.Dataset.take(Dataset.scala:2363) 在 org.apache.spark.sql.Dataset.showString(Dataset.scala:241) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(未知来源)在 java.lang.reflect.Method.invoke(未知来源)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:280) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:214) 在 java.lang.Thread.run(Unknown Source) 原因: net.razorvine.pickle.PickleException:预期零参数 ClassDict (用于 numpy.dtype)的构造 net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) 在 net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) 在 net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) 在 net.razorvine.pickle.Unpickler.load(Unpickler.java:99) 在 net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) 在 org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:156) 在 org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:155) 在 scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 在 scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 在 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知 来源)在 org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 在 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在 org.apache.spark.scheduler.Task.run(Task.scala:108) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(未知来源) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(未知来源) 还有 1 个

我不知道为什么这不起作用:我发现如果行只是浮点数而不是 DenseVectors 它将起作用,并且 python 函数 max 接受 DenseVectors 作为输入。

【问题讨论】:

【参考方案1】:

这个错误的原因是你将udf的返回类型定义为float,而udf实际上返回的是numpy.float64。 pyspark 将 floatnumpy.float64 视为不同的类型。 像这样将返回类型转换为浮点数。findmax = F.udf(lambda x: float(max(x)),DoubleType())

【讨论】:

谢谢!帮了大忙!

以上是关于从具有 DenseVector 行的 pyspark 数据帧中获取行的最大值的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 的 DataFrame 列中存储 DenseVector

Spark(1.6) Densevector.type 不带参数

使用 StandardScaler 时的 SparseVector 与 DenseVector

用于从表中选择具有最新时间戳的行的 JOOQ 代码

MySQL查询从具有1000万行的表中获取每个条目的最新记录

MYSQL 从具有 100 万行的表中选择 distinct(indexed_column)