对 RDD 的每个元素使用 sparkcontext 函数
Posted
技术标签:
【中文标题】对 RDD 的每个元素使用 sparkcontext 函数【英文标题】:Using a sparkcontext function for each element of the RDD 【发布时间】:2016-07-10 18:47:07 【问题描述】:我需要使用 Pyspark 从 Elasticsearch 读取数据。我正在尝试按如下方式在 pyspark 中设置流程 -
i) 创建rdd1 ii) foreach 出现在 rdd1 conf = 基于 rdd1 元素的动态值 rdd2 = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
我意识到“foreach”将在工作人员之间分配工作并尝试调用 sc.newAPIHadoopRDD 从而导致 sc 在工作人员上不可用的错误。
是否有其他方法可以实现上述目标? 注意 - 我需要使用“newAPIHadoopRDD”,因为其余的处理取决于它。
【问题讨论】:
是的。使用map()
来输出RDD 中每条记录所需的内容,然后在生成的RDD 上使用newAPIHadoopRDD()
方法。
感谢您的建议 Avihoo。不幸的是,newAPIHadoopRDD API 仅适用于 sc,即我们需要使用 sc.newAPIHadoopRDD() 并将在驱动程序上运行。此外,我们希望它在集群上运行,并希望有某种方法可以将 newAPIHadoopRDD() 序列化给工作人员,以便并行处理。
【参考方案1】:
你不能嵌套 RDD。如果你想循环rdd1
的结果,你必须先collect
给驱动程序。
val rdd1Result = rdd1.collect()
rdd1Result.foreach v =>
val conf = ...
sc.newAPIHadoopRDD...
【讨论】:
谢谢迪凯。感谢您的投入。有几点是问题 - i)“rdd1.collect”返回一个列表,而“.foreach”适用于 RDD ii)我们希望它在工作节点上运行以实现可扩展性。但是,我们怀疑整个“rdd1Result”只会在驱动节点上处理。有什么建议让它在工作节点上并行运行? 一个scala列表也有foreach
方法。无法访问工作节点中的 SparkContext。
是的迪凯。我们已经意识到 SparkContext 不能在工作节点中访问。感谢您的投入。【参考方案2】:
您不能在 foreach 中发送 RDD,也不应该尝试这样做。 除非您正在执行 join :在这种情况下 spark 可以处理两个 rdd,如果您考虑一下,这就是您所需要的。
看起来你正在做一个 theta-join。 根据您的数据,您可能可以使用具有近似值的精确连接来规避循环。
【讨论】:
谢谢 Marmouset。我们意识到我们需要改变我们的方法。结果,我们创建了一个函数,它的作用类似于 newAPIHadoopRDD() 的作用。我们的基本要求是查询我们以替代方式执行的弹性搜索。我们为此使用了 Hadoop API。我们使用 .map 将这个新函数传递给 RDD,以便它在工作节点上执行。这帮助我们实现了我们想要的,即 i) 查询弹性搜索 ii) 以可扩展的方式进行。以上是关于对 RDD 的每个元素使用 sparkcontext 函数的主要内容,如果未能解决你的问题,请参考以下文章