在 Spark Streaming 作业中获取 broadcast_1 的 broadcast_1_piece0 失败

Posted

技术标签:

【中文标题】在 Spark Streaming 作业中获取 broadcast_1 的 broadcast_1_piece0 失败【英文标题】:Failed to get broadcast_1_piece0 of broadcast_1 in Spark Streaming job 【发布时间】:2016-04-04 13:57:16 【问题描述】:

我在集群模式下在 yarn 上运行 spark 作业。该作业从 kafka 直接流中获取消息。我每 30 秒使用一次广播变量和检查点。当我第一次开始工作时,它运行良好,没有任何问题。如果我终止作业并重新启动,它会在收到来自 kafka 的消息时在执行程序中引发以下异常:

java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:177)
    at net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:1)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

有人知道如何解决这个错误吗?

Spark 版本:1.5.0

CDH 5.5.1

【问题讨论】:

【参考方案1】:

遇到只有第一次运行有效的问题时,总是会导致检查点数据出现问题。而且,检查点的使用只有在有要检查的东西时才会发生,这是来自 kafka 的第一条消息。 我建议你检查一下你的工作是否真的死了,也就是说,也许这个进程还在执行它的机器上运行。 尝试运行一个简单的ps -fe,看看是否有什么东西还在运行。如果有 2 个进程试图使用同一个检查点文件夹,它总是会失败。 希望这会有所帮助

【讨论】:

我查过了,工作没有死。我正在运行三个 sparks 作业并创建了检查点目录 /tmp/spark/、/tmp/spark、/tmp/spark/。如果我删除检查点目录然后启动作业,那么它运行没有任何问题。很可能,它无法从检查点数据初始化广播变量。 不确定我是否理解。似乎问题在于,当您停止作业时,它不会被杀死并且进程会继续运行。一种解决方法是创建一个脚本来终止进程并停止作业。我错过了什么吗? 抱歉打错字了,在我之前的评论中,我想提一下这份工作已经死了,我已经验证过了。

以上是关于在 Spark Streaming 作业中获取 broadcast_1 的 broadcast_1_piece0 失败的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming 的动态分配

sh 如何在YARN上配置Spark Streaming作业以获得良好的弹性(http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-y

一段时间后停止 Spark Streaming 作业

如何在 Spark Streaming 中使用基于数据集的转换?

如何使用spark streaming接收kafka中发送的自定义对象

spark streaming 和spark sql的区别