Spark Kafka Streaming 作业因 InvalidClassException 而失败

Posted

技术标签:

【中文标题】Spark Kafka Streaming 作业因 InvalidClassException 而失败【英文标题】:Spark Kafka Streaming job failing due to InvalidClassException 【发布时间】:2016-12-14 14:58:12 【问题描述】:

我正在使用 Kafka 客户端 0.8 在 Spark 2、CDH 5.9 中运行流式传输作业。简单的目标是将信息保存在 Impala 中,逐条记录。

我无法摆脱这个错误,因为我不知道它来自哪里:

16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1, 
local class serialVersionUID = 2

Direct Kafka Stream 由以下人员简单地创建

val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
  "bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
  "group.id" -> "myconsumergroup",
  "auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream  = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)

并由以下人员处理:

val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()

directKafkaStream.foreachRDD  rdd =>
  val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]

  val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")

    deviceEnriched.show(false)
    spark.sql("use my_database")
      deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")


streamingContext.start()
streamingContext.awaitTermination()

【问题讨论】:

【参考方案1】:

简答:消息是使用commons-lang3 JAR 版本序列化的,该版本与您在 Spark 中使用的 JAR不兼容

长答案:如果您刚刚用 Google 搜索了该错误消息,然后搜索了 Apache Commons 源代码,您会发现...

this post 深入研究 Java“类不兼容”序列化问题,一般来说 FastDateFormat 的源代码声明 serialVersionUID = 1L 直到 V3.1 但切换到 serialVersionUID = 2L 和 V3.2(因为当时二进制结构已经改变)

顺便说一句,我刚刚检查过,CDH 5.9 在 V3.1 中附带了commons-lang3(用于 Hive、Impala、Sentry、Hive-in-Oozie、Sqoop-in-Oozie)和V3.3.2(用于 Spark-in-Oozie)和 V3.4(用于 Sqoop),而 Spark 本身根本不需要它。去图吧。 而且由于 CDH 还没有随 Spark 2 一起提供,我猜你要么下载了“测试版”包,要么下载了 Apache 版本——我查了一下,Apache 版本(V2.0.2)附带了commons-lang3V3.3.2

我的 2 美分:只需在 Spark 2 命令行中强制使用 --jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar,看看这是否足以解决您的问题。

编辑  多花 2 美分,确保您的“自定义”JAR 优先于 YARN 类路径中已有的任何 JAR,--conf spark.yarn.user.classpath.first=true

【讨论】:

谢谢萨姆森。这解决了问题 :) 顺便说一句,Spark 2 包本周从 Cloudera 发布 GA,并附带 V3.3.2。正如你所说的:去图。我的根本问题是我无法弄清楚正在序列化哪个对象以及从哪里到哪里,但是按照您指出的方式强制 v3.1 解决了问题。 ... 一会儿。异常又回来了,不管包括 V3.1 还是 V3.3.2,异常总是相同的并且在同一个节点中(我正在运行这个在三个节点上)。所以我认为这可能与 Spark 有关但与我的工作无关?还有其他想法吗? 停止该节点可以解决问题,所以我猜该节点某处存在陈旧的配置。有什么办法可以刷新吗?由于它是一个虚拟机,我很想从头开始重新创建它 尝试--conf spark.yarn.user.classpath.first=true 让 Spark 激活 YARN 属性——否则 CLASSPATH 中 JAR 的实际顺序可能是随机的 (在 YARN 端默认为“false”;我认为 Spark默认情况下强制它为“true”,因为它会导致很多问题,但是...) 是的..谢谢,再次被爱,希望这次能好。我绝对想在我所有的工作中包含这个参数。

以上是关于Spark Kafka Streaming 作业因 InvalidClassException 而失败的主要内容,如果未能解决你的问题,请参考以下文章

如何使用spark streaming接收kafka中发送的自定义对象

spark-streaming读kafka数据到hive遇到的问题

大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

使用 Spark Structured Streaming 时限制 kafka 批量大小

spark streaming基础知识1

Spark 系列(十六)—— Spark Streaming 整合 Kafka