不能在 Spark DataFrame 中使用 orderBy 或 groupBy 函数

Posted

技术标签:

【中文标题】不能在 Spark DataFrame 中使用 orderBy 或 groupBy 函数【英文标题】:cannot use orderBy or groupBy function in Spark DataFrame 【发布时间】:2018-04-08 14:42:47 【问题描述】:

我创建了一个如下图所示的DataFrame:

val file = sc.textFile(FileName)
case class CreateDF(project:String, title:String, requests_num:Int, return_size:Int)
val df = file.map(line=>line.split(" ")).map(line=> CreateDF(line(0),line(1),line(2).toInt,line(3).toInt)).toDF()


+-------+--------------------+------------+-----------+
|project|               title|requests_num|return_size|
+-------+--------------------+------------+-----------+
|     aa|%CE%92%CE%84_%CE%...|           1|       4854|
|     aa|%CE%98%CE%B5%CF%8...|           1|       4917|
|     aa|%CE%9C%CF%89%CE%A...|           1|       4832|
|     aa|%CE%A0%CE%B9%CE%B...|           1|       4828|
|     aa|%CE%A3%CE%A4%CE%8...|           1|       4819|
|     aa|%D0%A1%D0%BE%D0%B...|           1|       4750|
|     aa|             271_a.C|           1|       4675|
|     aa|Battaglia_di_Qade...|           1|       4765|
|     aa|    Category:User_th|           1|       4770|
|     aa|  Chiron_Elias_Krase|           1|       4694|
|     aa|County_Laois/en/Q...|           1|       4752|
|     aa|    Dassault_rafaele|           2|       9372|
|     aa|Dyskusja_wikiproj...|           1|       4824|
|     aa|              E.Desv|           1|       4662|
|     aa|Enclos-apier/fr/E...|           1|       4772|
|     aa|File:Wiktionary-l...|           1|      10752|
|     aa|Henri_de_Sourdis/...|           1|       4748|
|     aa|Incentive_Softwar...|           1|       4777|
|     aa|Indonesian_Wikipedia|           1|       4679|
|     aa|           Main_Page|           5|     266946|
+-------+--------------------+------------+-----------+

我的目标是获取具有最大 return_size 的记录, 所以我想使用orderBy函数,df.orderBy("return_size"),我也尝试使用df.groupBy("return_size").max()。 但它不起作用.......

发生错误:

    scala> df.orderBy("return_size").show
2018-04-08 15:47:05 ERROR Executor:91 - Exception in task 0.0 in stage 5.0 (TID 10)
java.lang.NumberFormatException: For input string: "2371877485"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:583)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
    at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:27)
    at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:27)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
    at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:670)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1427)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1424)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2018-04-08 15:47:05 WARN  TaskSetManager:66 - Lost task 0.0 in stage 5.0 (TID 10, localhost, executor driver): java.lang.NumberFormatException: For input string: "2371877485"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:583)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
    at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:27)
    at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:27)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
    at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:670)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1427)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1424)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2018-04-08 15:47:05 ERROR TaskSetManager:70 - Task 0 in stage 5.0 failed 1 times; aborting job
2018-04-08 15:47:05 WARN  TaskSetManager:66 - Lost task 2.0 in stage 5.0 (TID 12, localhost, executor driver): TaskKilled (Stage cancelled)
2018-04-08 15:47:05 WARN  TaskSetManager:66 - Lost task 1.0 in stage 5.0 (TID 11, localhost, executor driver): TaskKilled (Stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 10, localhost, executor driver): java.lang.NumberFormatException: For input string: "2371877485"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:583)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
    at $anonfun$2.apply(<console>:27)
    at $anonfun$2.apply(<console>:27)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
    at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:670)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1427)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1424)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1029)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.reduce(RDD.scala:1011)
  at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1433)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1420)
  at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:135)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
  ... 49 elided
Caused by: java.lang.NumberFormatException: For input string: "2371877485"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Integer.parseInt(Integer.java:583)
  at java.lang.Integer.parseInt(Integer.java:615)
  at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
  at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
  at $anonfun$2.apply(<console>:27)
  at $anonfun$2.apply(<console>:27)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
  at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:670)
  at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
  at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1427)
  at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1424)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

谁能告诉我如何解决这个问题,或者任何其他获取列 return_size 最大值的解决方案

【问题讨论】:

【参考方案1】:

您的代码将值转换为整数,但您也有像 (2371877485) 这样不适合整数的值 - 改为转换为 Long

val df = file.map(line=>line.split(" ")).map(line=> CreateDF(line(0),line(1),line(2).toInt,line(3).toLong)).toDF()

【讨论】:

非常感谢,帮了大忙,忘记查看数据类型了【参考方案2】:

当你读取文件时

val file = sc.textFile("/home/user1/project2/pagecounts-20160101-000000")

它被读作字符串。你有 request_numreturn_size 作为 Int2371877485 是一个非常大的数字,所以发生了错误。

java.lang.NumberFormatException:对于输入字符串:“2371877485”

解决方案是将您的案例类更改为

case class CreateDF(project:String, title:String, requests_num:BigInt, return_size:BigInt)

然后像使用它一样

val df = file.map(line=>line.split(" ")).map(line=> CreateDF(line(0),line(1),line(2).asInstanceOF[BigInt],line(3).asInstanceOF[BigInt])).toDF()

您可以在scala api doc找到更多信息

final val MaxValue: Int(2147483647) The largest value representable as an Int.

【讨论】:

非常感谢,帮了大忙,忘记查看数据类型了

以上是关于不能在 Spark DataFrame 中使用 orderBy 或 groupBy 函数的主要内容,如果未能解决你的问题,请参考以下文章

Spark 'join' DataFrame 与 List 并返回 String

使用 DataFrame API 时,自联接无法按预期工作

spark dataframe 将null 改为 nan

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区

我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?

在 Spark 中使用 Dataframe 获取平均值