如何从持续的 RDD 构造 DStream?

Posted

技术标签:

【中文标题】如何从持续的 RDD 构造 DStream?【英文标题】:How to construct DStream from continued RDDs? 【发布时间】:2016-03-15 02:52:20 【问题描述】:

我每 5 分钟从 ElasticSearch 读取数据到 Spark。所以每5分钟会有一个RDD。

我希望基于这些 RDD 构建一个 DStream,这样我就可以得到最近 1 天、最近 1 小时、最近 5 分钟等数据的报告。

为了构建DStream,我正在考虑创建自己的接收器,但是spark的官方文档只提供了使用scala或java的信息。我使用python。

那么你知道有什么方法吗?我知道我们可以。毕竟 DStream 是一系列的 RDD,当然我们应该从连续的 RDD 中创建 DStream。我只是不知道怎么做。请给点建议

【问题讨论】:

【参考方案1】:

编写自己的接收器将是您提到的一种方式,但似乎开销很大。你可以做的是使用QueueReceiver 来创建QueueInputDStream,就像在this example 中一样。它是 Scala,但您也应该能够在 Python 中做类似的事情:

val rddQueue = new Queue[RDD[Map[String, Any]]]()
val inputStream = ssc.queueStream(rddQueue)

之后,您只需每隔X sec/min/h/day/whatever 查询一次您的 ES 实例,然后将结果放入该队列。

对于 Python,我猜它会是这样的:

rddQueue = []
rddQueue += es_rdd() // method that returns an RDD from ES
inputStream = ssc.queueStream(rddQueue)

// some kind of loop that adds to rddQueue new RDDS

显然,在queueStream 中使用它之前,您需要在队列中有一些东西(或者至少我在pyspark 中遇到异常,如果它是空的)。

【讨论】:

感谢@Mateusz Dymczyk。 queueStream 将不起作用,因为在您创建 DStream 之后。添加到队列中的任何新 rdd 都不会计算在内 @KramerLi 啊,你是对的,我想知道是否有办法改变这种行为......【参考方案2】:

没有必要使用接收器。您可以直接覆盖 InputDStream 类来实现您的 elasticsearch 数据拉取逻辑。当您的数据已经受益于可复制和可重放的存储时,最好不要依赖接收器。

见:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.InputDStream

不过,我不确定您是否可以直接从 python 轻松创建 InputDStream 类。

【讨论】:

以上是关于如何从持续的 RDD 构造 DStream?的主要内容,如果未能解决你的问题,请参考以下文章

如何找到每个 DStream 的 RDD 中所有值的总和?

如何使用 Pyspark 组合两个 Dstream(类似于普通 RDD 上的 .zip)

如何保存 Spark Java Dstream RDD

深入理解Spark Streaming

DStream中的列表处理

Spark流DStream RDD以获取文件名