数据源用完时如何停止火花流

Posted

技术标签:

【中文标题】数据源用完时如何停止火花流【英文标题】:How to stop spark streaming when the data source has run out 【发布时间】:2016-05-07 17:00:21 【问题描述】:

我有一个 spark 流作业,它每 5 秒从 Kafka 读取一次,对传入的数据进行一些转换,然后写入文件系统。

这实际上并不需要是一个流式作业,实际上,我只想每天运行一次以将消息排入文件系统。不过我不确定如何停止这项工作。

如果我将超时传递给streamingContext.awaitTermination,它不会停止进程,它所做的只是导致进程在迭代流时产生错误(参见下面的错误)

完成我想做的事情的最佳方法是什么

这适用于 Python 上的 Spark 1.6

编辑:

感谢@marios,解决方案是这样的:

ssc.start()
ssc.awaitTermination(10)
ssc.stop()

在停止之前运行脚本十秒钟。

简化代码:

conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
    ssc,
    kafkaParams["zookeeper.connect"],
    "vehicle-data-importer",
    topicPartitions,
    kafkaParams)

stream.saveAsTextFiles('stream-output/kafka-vehicle-data')

ssc.start()
ssc.awaitTermination(10)

错误:

16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms
py4j.Py4JException: Cannot obtain a new communication channel
    at py4j.CallbackClient.sendCommand(CallbackClient.java:232)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
    at com.sun.proxy.$Proxy14.call(Unknown Source)
    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
    at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)

【问题讨论】:

你怎么知道数据源已经出局了?顺便说一句,您可以在ssc.awaitTermination 之后调用ssc.stop() 来停止Streaming 应用程序。 恕我直言,如果您只需要每天读取一次数据,请创建一个 Spark Ba​​tch 作业来读取和处理数据,并进一步使用诸如 cron 或 Quartz 之类的调度程序来安排您的作业。跨度> 执行单个批处理(使用 createRDD)的问题是没有简单的方法来跟踪 zookeeper 中的偏移量。这是我想在这里实现的目标之一 我考虑过调用ssc.stop,但我无法弄清楚如何异步调用它 【参考方案1】:

似乎正确的调用方法是awaitTerminationOrTimeout(self, timeout)。

我不确定它是否也会停止流式传输上下文。因此,也许您可​​以在超时结束后立即调用 ssc.stop()。

ssc.start()
ssc.awaitTerminationOrTimeout(10)
ssc.stop()

注意:查看here 是否有类似问题。

【讨论】:

如果你发现一个骗子标记它。 这不是完全重复的(但可能与某人有关)。那里的问题是如何停止流式套接字而不是关闭整个火花上下文。问题还在于在 Scala 中运行的 twitter 上下文,而不是在 pyspark 上运行的 Kafka。 如果这不是答案,也许评论更合适 这是正确答案。 ssc.awaitTermination(10) 阻止脚本执行,但实际上并没有停止流上下文。【参考方案2】:

试一试Kafka“consumer.timeout.ms”参数,它将优雅地结束KafkaReceiver。(来自kafka 0.8 configuration)

如果没有可用消息,则向消费者抛出超时异常 指定时间间隔后消费

HDF = KafkaUtils.createStream(ssc, topics=strLoc : 1, kafkaParams="consumer.timeout.ms":"20000" , zkQuorum='xxx:2181', groupId='xxx-consumer-group')

您将无法在当前流式执行中接收任何新的 kafka 消息,并且总是得到空 RDD。 并检查 DSteam.foreachRDD(func) 中空 RDD 的计数。如果您不断获得空 RDD,则终止流式执行。

【讨论】:

试过这个:kafkaParams = "metadata.broker.list": str.join(',',brokers), "zookeeper.connect": str.join(',',zkNodes), "consumer.id": "vehicle-data-importer", "group.id":"importers", "auto.offset.reset": "smallest", "consumer.timeout.ms": "10000" 没有效果 "consumer.timeout.ms" 不会停止流式执行,但会终止 kafka 接收器以等待更多消息,即使新消息稍后到达。您可以简单地使用“ssc.awaitTerminationOrTimeout(10)”,但这并不安全。【参考方案3】:

这里的问题从 1.6 开始,在与 Dstream 处理线程相同的线程中调用 ssc.stop 会创建死锁,因为 stop 会等待轮询线程完成创建 deadlock.sp 从另一个线程调用停止

【讨论】:

以上是关于数据源用完时如何停止火花流的主要内容,如果未能解决你的问题,请参考以下文章

6. Redis在内存用完时会怎么办?以及Redis如何处理已过期的数据?

当自动递增列用完时会发生啥?

如何使用火花流处理实时流数据/日志?

如何将火花流数据帧存储到 Mysql 表。?

如何在火花流中刷新加载的数据帧内容?

如何将火花流输出转换为数据帧或存储在表中