Spark sql 抛出 UTF8 字符串转换错误

Posted

技术标签:

【中文标题】Spark sql 抛出 UTF8 字符串转换错误【英文标题】:Spark sql throws UTF8 String Casting Error 【发布时间】:2017-10-01 20:48:11 【问题描述】:

Spark sql Window 功能似乎无法正常工作。 我在 Hadoop 集群中运行一个 Spark 作业,其中 HDFS 块大小为 128 MB,并且 Spark 版本 1.5 CDH 5.5

我正在读取一个 avro 文件并执行以下操作

我的要求:

如果有多个记录具有相同的data_rfe_id,则根据最大seq_id和最大service_id取单个记录

我看到在原始数据中有一些记录具有相同的 data_rfe_id 和相同的 seq_id 因此,我使用 Window 函数应用 row_number 以便我可以过滤具有 row_num === 1 的记录

我只想使用窗口函数来实现这一点。

为什么会这样?

在数据框上应用窗口函数之前是否需要重新洗牌?

它仅针对某些任务引发以下异常,并且在 4 次重复失败的任务后作业失败?

我们什么时候会遇到这种异常。

 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.rowNumber
 .....

scala> df.printSchema
root
 |-- transitional_key: string (nullable = true)
 |-- seq_id: string (nullable = true)
 |-- data_rfe_id: string (nullable = true)
 |-- service_id: string (nullable = true)
 |-- event_start_date_time: string (nullable = true)
 |-- event_id: string (nullable = true)


 val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id").desc)
  val rankDF =df.withColumn("row_num",rowNumber.over(windowFunction))
  rankDF.select("data_rfe_id","seq_id","service_id","row_num").show(200,false)

在我的代码中,我没有进行任何强制转换。将所有内容都读取为字符串

当我在 Spark-shell 中运行上述代码时,我得到了正确的结果。

但是,如果我尝试通过提供 jar 来从 spark-submit 命令运行相同的代码,则会引发以下异常

  Caused by: java.lang.ClassCastException:  org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getInt(JoinedRow.scala:82)
at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:45)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:121)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:330)
at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

有人可以解释我为什么会收到上述错误吗?以及如何解决?

【问题讨论】:

我读了一个 avro 文件,其中所有列都是字符串,一旦我转换为数据帧,所有列也是字符串,唯一出现在图片中的整数是作为派生列的 row_number 【参考方案1】:

源中的数据可能已被修改,并且由于数据类型问题,它将失败。要解决此错误,您可以检查源中的数据并删除不必要的文件,它应该可以工作。

【讨论】:

以上是关于Spark sql 抛出 UTF8 字符串转换错误的主要内容,如果未能解决你的问题,请参考以下文章

Spark Shell 导入正常,但在引用类时抛出错误

MySQL抛出不正确的字符串值错误

为啥 SQL Server 在将 int 转换为数据类型 numeric 时抛出算术溢出错误?

UPDATEXML 抛出 SQL 错误:ORA-00927:尝试转换和更新 CLOB 数据类型时缺少等号

使用 Spark SQL 时无法将获取 B 转换为 java.lang.String

SQL 默默地将 int 转换为 varchar,但在遇到 varchar 时会抛出错误?