如何从持续的 RDD 构造 DStream?
Posted
技术标签:
【中文标题】如何从持续的 RDD 构造 DStream?【英文标题】:How to construct DStream from continued RDDs? 【发布时间】:2016-03-15 02:52:20 【问题描述】:我每 5 分钟从 ElasticSearch 读取数据到 Spark。所以每5分钟会有一个RDD。
我希望基于这些 RDD 构建一个 DStream,这样我就可以得到最近 1 天、最近 1 小时、最近 5 分钟等数据的报告。
为了构建DStream,我正在考虑创建自己的接收器,但是spark的官方文档只提供了使用scala或java的信息。我使用python。
那么你知道有什么方法吗?我知道我们可以。毕竟 DStream 是一系列的 RDD,当然我们应该从连续的 RDD 中创建 DStream。我只是不知道怎么做。请给点建议
【问题讨论】:
【参考方案1】:编写自己的接收器将是您提到的一种方式,但似乎开销很大。你可以做的是使用QueueReceiver
来创建QueueInputDStream
,就像在this example 中一样。它是 Scala,但您也应该能够在 Python 中做类似的事情:
val rddQueue = new Queue[RDD[Map[String, Any]]]()
val inputStream = ssc.queueStream(rddQueue)
之后,您只需每隔X sec/min/h/day/whatever
查询一次您的 ES 实例,然后将结果放入该队列。
对于 Python,我猜它会是这样的:
rddQueue = []
rddQueue += es_rdd() // method that returns an RDD from ES
inputStream = ssc.queueStream(rddQueue)
// some kind of loop that adds to rddQueue new RDDS
显然,在queueStream
中使用它之前,您需要在队列中有一些东西(或者至少我在pyspark
中遇到异常,如果它是空的)。
【讨论】:
感谢@Mateusz Dymczyk。 queueStream 将不起作用,因为在您创建 DStream 之后。添加到队列中的任何新 rdd 都不会计算在内 @KramerLi 啊,你是对的,我想知道是否有办法改变这种行为......【参考方案2】:没有必要使用接收器。您可以直接覆盖 InputDStream 类来实现您的 elasticsearch 数据拉取逻辑。当您的数据已经受益于可复制和可重放的存储时,最好不要依赖接收器。
见:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.InputDStream
不过,我不确定您是否可以直接从 python 轻松创建 InputDStream 类。
【讨论】:
以上是关于如何从持续的 RDD 构造 DStream?的主要内容,如果未能解决你的问题,请参考以下文章