filebeat+kafka+SparkStreaming程序报错及解决办法

Posted zhzhang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了filebeat+kafka+SparkStreaming程序报错及解决办法相关的知识,希望对你有一定的参考价值。

17/07/01 03:07:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
17/07/01 03:07:21 WARN BlockManager: Block input-0-1498849640800 replicated to only 0 peer(s) instead of 1 peers
17/07/01 03:07:26 ERROR Executor: Exception in task 0.0 in stage 113711.0 (TID 111661)
java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
    at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:676)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:329)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/07/01 03:07:26 WARN TaskSetManager: Lost task 0.0 in stage 113711.0 (TID 111661, localhost, executor driver): java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
    at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:676)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:329)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/07/01 03:07:26 ERROR TaskSetManager: Task 0 in stage 113711.0 failed 1 times; aborting job
17/07/01 03:07:26 ERROR JobScheduler: Error running job streaming job 1498845965000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/home/admin/agent/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call
    r = self.func(t, *rdds)
  File "/home/admin/agent/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 171, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/home/admin/agent/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1342, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
    File "/home/admin/agent/spark/python/lib/pyspark.zip/pyspark/context.py", line 968, in runJob
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/home/admin/agent/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/admin/agent/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 113711.0 failed 1 times, most recent failure: Lost task 0.0 in stage 113711.0 (TID 111661, localhost, executor driver): java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
    at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:676)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:329)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

 

排查原因1:

  1. 【不是】由于代码中checkpoint目录为本地导致,搭建了hdfs,将checkpoint移到hdfs,发现还是运行一天左右就挂掉,报错如上。

  2. 待续

 

请大虾们指点。

以上是关于filebeat+kafka+SparkStreaming程序报错及解决办法的主要内容,如果未能解决你的问题,请参考以下文章

从 kafka-filebeat 创建动态索引

filebeat 接入kafka

filebeat-收集日志写入到Kafka

filebeat + kafka +ELK集群实验

Suricata通过filebeat将告警事件送往Kafka,ElasticSearch

filebeat到kafka日志无法传输