Spark Kafka Streaming 作业因 InvalidClassException 而失败
Posted
技术标签:
【中文标题】Spark Kafka Streaming 作业因 InvalidClassException 而失败【英文标题】:Spark Kafka Streaming job failing due to InvalidClassException 【发布时间】:2016-12-14 14:58:12 【问题描述】:我正在使用 Kafka 客户端 0.8 在 Spark 2、CDH 5.9 中运行流式传输作业。简单的目标是将信息保存在 Impala 中,逐条记录。
我无法摆脱这个错误,因为我不知道它来自哪里:
16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1,
local class serialVersionUID = 2
Direct Kafka Stream 由以下人员简单地创建
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
"group.id" -> "myconsumergroup",
"auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)
并由以下人员处理:
val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()
directKafkaStream.foreachRDD rdd =>
val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]
val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")
deviceEnriched.show(false)
spark.sql("use my_database")
deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
streamingContext.start()
streamingContext.awaitTermination()
【问题讨论】:
【参考方案1】:简答:消息是使用commons-lang3
JAR 版本序列化的,该版本与您在 Spark 中使用的 JAR不兼容。
长答案:如果您刚刚用 Google 搜索了该错误消息,然后搜索了 Apache Commons 源代码,您会发现...
this post 深入研究 Java“类不兼容”序列化问题,一般来说FastDateFormat
的源代码声明 serialVersionUID = 1L
直到 V3.1 但切换到 serialVersionUID = 2L
和 V3.2(因为当时二进制结构已经改变)
顺便说一句,我刚刚检查过,CDH 5.9 在 V3.1 中附带了commons-lang3
(用于 Hive、Impala、Sentry、Hive-in-Oozie、Sqoop-in-Oozie)和V3.3.2(用于 Spark-in-Oozie)和 V3.4(用于 Sqoop),而 Spark 本身根本不需要它。去图吧。
而且由于 CDH 还没有随 Spark 2 一起提供,我猜你要么下载了“测试版”包,要么下载了 Apache 版本——我查了一下,Apache 版本(V2.0.2)附带了commons-lang3
V3.3.2
我的 2 美分:只需在 Spark 2 命令行中强制使用 --jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar
,看看这是否足以解决您的问题。
编辑 多花 2 美分,确保您的“自定义”JAR 优先于 YARN 类路径中已有的任何 JAR,--conf spark.yarn.user.classpath.first=true
【讨论】:
谢谢萨姆森。这解决了问题 :) 顺便说一句,Spark 2 包本周从 Cloudera 发布 GA,并附带 V3.3.2。正如你所说的:去图。我的根本问题是我无法弄清楚正在序列化哪个对象以及从哪里到哪里,但是按照您指出的方式强制 v3.1 解决了问题。 ... 一会儿。异常又回来了,不管包括 V3.1 还是 V3.3.2,异常总是相同的并且在同一个节点中(我正在运行这个在三个节点上)。所以我认为这可能与 Spark 有关但与我的工作无关?还有其他想法吗? 停止该节点可以解决问题,所以我猜该节点某处存在陈旧的配置。有什么办法可以刷新吗?由于它是一个虚拟机,我很想从头开始重新创建它 尝试--conf spark.yarn.user.classpath.first=true
让 Spark 激活 YARN 属性——否则 CLASSPATH 中 JAR 的实际顺序可能是随机的 (在 YARN 端默认为“false”;我认为 Spark默认情况下强制它为“true”,因为它会导致很多问题,但是...)
是的..谢谢,再次被爱,希望这次能好。我绝对想在我所有的工作中包含这个参数。以上是关于Spark Kafka Streaming 作业因 InvalidClassException 而失败的主要内容,如果未能解决你的问题,请参考以下文章
如何使用spark streaming接收kafka中发送的自定义对象
spark-streaming读kafka数据到hive遇到的问题
大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式