Spark Streaming,foreachRDD 错误:比较方法违反其一般合同
Posted
技术标签:
【中文标题】Spark Streaming,foreachRDD 错误:比较方法违反其一般合同【英文标题】:Spark Streaming, foreachRDD error : Comparison method violates its general contract 【发布时间】:2015-07-17 13:36:28 【问题描述】:我正在测试 Spark Streaming API。 该应用程序部署在带有 Spark 1.4.0 的 Amazon EMR 集群上 我正在 S3 中对数据进行排序并保存文件。
管道的代码(排序算法除外)详述如下:
public KinesisPreProcessPipeline(JavaStreamingContext jssc, final KinesisPreProcessModuleConfiguration moduleConfiguration)
JavaReceiverInputDStream<byte[]> inputDStream = KinesisUtils.createStream(jssc, moduleConfiguration.getAppName(), moduleConfiguration.getStreamName(),
"kinesis." + moduleConfiguration.getRegion() + ".amazonaws.com", moduleConfiguration.getRegion(), InitialPositionInStream.LATEST,
Durations.seconds(5), StorageLevel.MEMORY_AND_DISK_SER());
JavaDStream<StreamingMessage> messageJavaDStream = inputDStream.map(new Function<byte[], StreamingMessage>()
@Override
public StreamingMessage call(byte[] bytes) throws Exception
return jsonParser.fromJson(new String(bytes), StreamingMessage.class);
);
final String destinationFolder = moduleConfiguration.getDestinationFolder();
StreamingPreProcessPipeline pipeline = new StreamingPreProcessPipeline().withInputDStream(messageJavaDStream)
.withPreProcessStep(new SortPreProcess());
JavaDStream<StreamingMessage> output = pipeline.execute();
output.checkpoint(Durations.seconds(moduleConfiguration.getBatchInterval() * 2));
JavaDStream<String> messagesAsJson = output.map(new Function<StreamingMessage, String>()
@Override
public String call(StreamingMessage message) throws Exception
return jsonParser.toJson(message);
);
messagesAsJson.foreachRDD(new Function<JavaRDD<String>, Void>()
@Override
public Void call(JavaRDD<String> rdd) throws Exception
rdd.saveAsTextFile(destinationFolder + "/" + dateFormat.print(new DateTime()) + "-" + rdd.id());
return null;
);
当应用程序在集群上运行时,它会快速失败并出现以下错误。
15/07/17 13:17:36 错误 executor.Executor:阶段 8.0 (TID 90) 中任务 0.1 中的异常 java.lang.IllegalArgumentException:比较方法违反了它的一般约定! 在 org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:776) 在 org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:507) 在 org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:435) 在 org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:307) 在 org.apache.spark.util.collection.TimSort.sort(TimSort.java:135) 在 org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) 在 org.apache.spark.util.collection.PartitionedPairBuffer.partitionedDestructiveSortedIterator(PartitionedPairBuffer.scala:70) 在 org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:690) 在 org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:708) 在 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64) 在 org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 在 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:242) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 在 org.apache.spark.scheduler.Task.run(Task.scala:70) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:745
错误发生在 foreachRDD 步骤中,但我仍在寻找失败的原因...
【问题讨论】:
什么是 StreamingMessage? 它是一个包含消息字段的 java bean。基本上是一堆字符串或数字字段和键/值映射(字符串)。 你能贴出StreamingMessage
的代码吗?可能它的equals
和|或hashCode
实现不正确。
您是对的,equals 和 compareTo 在用于排序的类中不一致。在一种特殊情况下, compareTo 返回 0 但 equals 将返回 false。修复此错误后,作业按预期工作。
【参考方案1】:
用于排序的类在 compareTo 实现中存在错误。 Comparable 的 javadoc 建议以与 equals() 一致的方式实现 compareTo。修复此错误后,火花作业按预期工作。
【讨论】:
是否需要重写equals方法才能使用我用来排序的参数?以上是关于Spark Streaming,foreachRDD 错误:比较方法违反其一般合同的主要内容,如果未能解决你的问题,请参考以下文章
.Spark Streaming(上)--实时流计算Spark Streaming原理介
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行
Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID