Spark Stream Kafka 在 JavaStreamingContext.start 处挂起,没有创建 Spark 作业
Posted
技术标签:
【中文标题】Spark Stream Kafka 在 JavaStreamingContext.start 处挂起,没有创建 Spark 作业【英文标题】:Spark Stream Kafka hang at JavaStreamingContext.start, no spark job create 【发布时间】:2017-10-20 04:10:19 【问题描述】:操作系统:Red Hat Enterprise Linux Server 6.5 版 JRE:甲骨文 1.8.0.144-b01 火花流_2.11:2.1.0 spark-streaming-kafka-0-10_2.11:2.1.0
通过 spark-submit 提交到独立 Spark 集群的 Spark Stream Kafka jar,并且运行了几天。但是最近,我们发现没有为流生成新的作业,我们尝试重新启动作业,并重新启动集群,流只是停留在 JavaStreamingContext.start 和 WAITING(在对象监视器上)。线程转储如下,spark 和 Kafka 没有错误日志。我想知道火花流在等待什么......
"shuffle-server-3-4" #35 daemon prio=5 os_prio=0 tid=0x00007f76a0041800 nid=0x3d34 runnable [0x00007f76911e5000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f8ea3be8> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000f8ee3600> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f8ea3ae0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:760)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"shuffle-server-3-3" #34 daemon prio=5 os_prio=0 tid=0x00007f76a0040800 nid=0x3d33 runnable [0x00007f76912e6000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000fc2747c0> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000fc2874c0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000fc2746c8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:760)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"shuffle-server-3-2" #33 daemon prio=5 os_prio=0 tid=0x00007f76a003e800 nid=0x3d32 runnable [0x00007f76913e7000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000fb227370> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000fb2296a0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000fb227278> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:760)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"ForkJoinPool-1-worker-5" #80 daemon prio=5 os_prio=0 tid=0x00007f76a0034800 nid=0x3d31 runnable [0x00007f76916e7000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f8e8ed98> (a sun.nio.ch.Util$3)
- locked <0x00000000f8e8ed88> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f8e7d008> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:454)
at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.Fetcher.listOffset(Fetcher.java:324)
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:298)
at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:174)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1409)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:983)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:168)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:244)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Locked ownable synchronizers:
- None
"JobGenerator" #79 daemon prio=5 os_prio=0 tid=0x00007f76a0007800 nid=0x3d30 waiting on condition [0x00007f76917e9000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000fe48b8d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:46)
Locked ownable synchronizers:
- None
"JobScheduler" #78 daemon prio=5 os_prio=0 tid=0x00007f76a0004800 nid=0x3d2f waiting on condition [0x00007f76918ea000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000fe48cb98> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:46)
Locked ownable synchronizers:
- None
"streaming-start" #77 daemon prio=5 os_prio=0 tid=0x00007f77323a1000 nid=0x3d2e in Object.wait() [0x00007f76919ea000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at scala.concurrent.forkjoin.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:295)
- locked <0x00000000fa037d50> (a scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask)
at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
at scala.collection.parallel.ExecutionContextTasks$class.executeAndWaitResult(Tasks.scala:558)
at scala.collection.parallel.ExecutionContextTaskSupport.executeAndWaitResult(TaskSupport.scala:80)
at scala.collection.parallel.ParIterableLike$class.foreach(ParIterableLike.scala:463)
at scala.collection.parallel.mutable.ParArray.foreach(ParArray.scala:56)
at org.apache.spark.streaming.DStreamGraph.start(DStreamGraph.scala:49)
- locked <0x00000000fa0380d0> (a org.apache.spark.streaming.DStreamGraph)
at org.apache.spark.streaming.scheduler.JobGenerator.startFirstTime(JobGenerator.scala:194)
at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:100)
- locked <0x00000000fe48b4d0> (a org.apache.spark.streaming.scheduler.JobGenerator)
at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:102)
- locked <0x00000000fe48b170> (a org.apache.spark.streaming.scheduler.JobScheduler)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at org.apache.spark.util.ThreadUtils$$anon$2.run(ThreadUtils.scala:126)
Locked ownable synchronizers:
- None
"SparkListenerBus" #21 daemon prio=5 os_prio=0 tid=0x00007f7732291800 nid=0x3d2d waiting on condition [0x00007f7691cec000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000081c9be70> (a java.util.concurrent.Semaphore$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
Locked ownable synchronizers:
- None
"Spark Context Cleaner" #74 daemon prio=5 os_prio=0 tid=0x00007f773228a000 nid=0x3d2b in Object.wait() [0x00007f7691eee000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000000fe4675d0> (a java.lang.ref.ReferenceQueue$Lock)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:175)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:172)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:67)
Locked ownable synchronizers:
- None
"shuffle-server-6-1" #70 daemon prio=5 os_prio=0 tid=0x00007f77321b5800 nid=0x3d2a runnable [0x00007f7691fef000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000fa182e28> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000fa1b3938> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000fa182d90> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:760)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"threadDeathWatcher-4-1" #65 daemon prio=1 os_prio=0 tid=0x00007f7704019800 nid=0x3d29 waiting on condition [0x00007f76932f1000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at io.netty.util.ThreadDeathWatcher$Watcher.run(ThreadDeathWatcher.java:150)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"shuffle-client-1-3" #30 daemon prio=5 os_prio=0 tid=0x00007f76fc006800 nid=0x3d28 runnable [0x00007f76933f2000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000fd326748> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000fd328838> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000fd3266a0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:760)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"shuffle-client-1-2" #29 daemon prio=5 os_prio=0 tid=0x00007f770801a800 nid=0x3d27 runnable [0x00007f76934f3000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000fe49d458> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000fe4b02d0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000fe49d360> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:760)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"shuffle-client-1-1" #28 daemon prio=5 os_prio=0 tid=0x00007f7700005000 nid=0x3d26 runnable [0x00007f76935f4000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000fe4b24a0> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000fe4b4570> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000fe4b23a8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:760)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"appclient-registration-retry-thread" #61 daemon prio=5 os_prio=0 tid=0x00007f76a800f000 nid=0x3d22 waiting on condition [0x00007f76939f8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000fe4e0e28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"driver-revive-thread" #57 daemon prio=5 os_prio=0 tid=0x00007f76b0004000 nid=0x3d1e waiting on condition [0x00007f7693dfc000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000fe4e0450> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"dag-scheduler-event-loop" #56 daemon prio=5 os_prio=0 tid=0x00007f77321a5800 nid=0x3d1d waiting on condition [0x00007f7693efd000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000fe38ff08> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:46)
Locked ownable synchronizers:
- None
"shuffle-server-3-1" #32 daemon prio=5 os_prio=0 tid=0x00007f7731fea800 nid=0x3d07 runnable [0x00007f76f4863000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000fc26eeb8> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000fc271470> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000fc26edb0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:760)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"kafka-producer-network-thread | producer-1" #18 daemon prio=5 os_prio=0 tid=0x00007f773187a800 nid=0x3ce5 runnable [0x00007f76f5d6b000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000081bb6f80> (a sun.nio.ch.Util$3)
- locked <0x0000000081bb6f70> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000081bb6e48> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:454)
at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Thread.java:748)
"main" #1 prio=5 os_prio=0 tid=0x00007f773000d800 nid=0x3ca7 in Object.wait() [0x00007f7736da7000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1252)
- locked <0x00000000fe463ad0> (a org.apache.spark.util.ThreadUtils$$anon$2)
at java.lang.Thread.join(Thread.java:1326)
at org.apache.spark.util.ThreadUtils$.runInNewThread(ThreadUtils.scala:135)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
- locked <0x0000000081c9ea00> (a java.lang.Object)
- locked <0x0000000081f19c80> (a org.apache.spark.streaming.StreamingContext)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at com.ccb.iomp.appmon.analysis.statistic.processor.StatisticProcessor.start(StatisticProcessor.java:780)
at com.ccb.iomp.appmon.analysis.statistic.TranLogStatisticApp.main(TranLogStatisticApp.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Locked ownable synchronizers:
- None
【问题讨论】:
【参考方案1】:根据以下堆栈跟踪:
"ForkJoinPool-1-worker-5" #80 daemon prio=5 os_prio=0 tid=0x00007f76a0034800 nid=0x3d31 runnable [0x00007f76916e7000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f8e8ed98> (a sun.nio.ch.Util$3)
- locked <0x00000000f8e8ed88> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f8e7d008> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:454)
at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.Fetcher.listOffset(Fetcher.java:324)
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:298)
at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:174)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1409)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:983)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:168)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:244)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
它停留在从 Kafka 获取偏移量。您应该检查您的 Kafka 集群。
【讨论】:
非常感谢!我们已经重启了 Kafka 集群,流工作了!奇怪的是logstash和spark stream消费同一个topic并且在重启前还可以正常工作~【参考方案2】:我最近也遇到了这个问题。在我的情况下,问题是在开发环境中只有一个 Kafka 代理运行,但是,offsets.topic.replication.factor
设置为 2。尝试将其设置为 1。这在生产中不是一个好主意,但可能会解决这个问题您只需要 1 个代理进行测试。
【讨论】:
以上是关于Spark Stream Kafka 在 JavaStreamingContext.start 处挂起,没有创建 Spark 作业的主要内容,如果未能解决你的问题,请参考以下文章
Spark Direct Stream 不会为每个 kafka 分区创建并行流
大数据篇:flume+kafka+spark stream+hbase做日志收集
大数据篇:flume+kafka+spark stream+hbase做日志收集
Spark2.3(三十七):Stream join Stream(res文件每天更新一份)