filebeat+kafka+SparkStreaming程序报错及解决办法
Posted zhzhang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了filebeat+kafka+SparkStreaming程序报错及解决办法相关的知识,希望对你有一定的参考价值。
17/07/01 03:07:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 17/07/01 03:07:21 WARN BlockManager: Block input-0-1498849640800 replicated to only 0 peer(s) instead of 1 peers 17/07/01 03:07:26 ERROR Executor: Exception in task 0.0 in stage 113711.0 (TID 111661) java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84) at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361) at scala.Option.foreach(Option.scala:257) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356) at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:676) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:329) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/07/01 03:07:26 WARN TaskSetManager: Lost task 0.0 in stage 113711.0 (TID 111661, localhost, executor driver): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84) at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361) at scala.Option.foreach(Option.scala:257) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356) at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:676) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:329) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/07/01 03:07:26 ERROR TaskSetManager: Task 0 in stage 113711.0 failed 1 times; aborting job 17/07/01 03:07:26 ERROR JobScheduler: Error running job streaming job 1498845965000 ms.0 org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/home/admin/agent/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call r = self.func(t, *rdds) File "/home/admin/agent/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 171, in takeAndPrint taken = rdd.take(num + 1) File "/home/admin/agent/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1342, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/home/admin/agent/spark/python/lib/pyspark.zip/pyspark/context.py", line 968, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/home/admin/agent/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/admin/agent/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 113711.0 failed 1 times, most recent failure: Lost task 0.0 in stage 113711.0 (TID 111661, localhost, executor driver): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84) at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361) at scala.Option.foreach(Option.scala:257) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361) at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356) at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:676) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:329) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)
排查原因1:
1. 【不是】由于代码中checkpoint目录为本地导致,搭建了hdfs,将checkpoint移到hdfs,发现还是运行一天左右就挂掉,报错如上。
2. 待续
请大虾们指点。
以上是关于filebeat+kafka+SparkStreaming程序报错及解决办法的主要内容,如果未能解决你的问题,请参考以下文章