如何解决嵌套地图函数中的 SPARK-5063

Posted

技术标签:

【中文标题】如何解决嵌套地图函数中的 SPARK-5063【英文标题】:How to solve SPARK-5063 in nested map functions 【发布时间】:2015-05-01 23:09:21 【问题描述】:

RDD 转换和动作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关详细信息,请参阅 SPARK-5063。

正如错误所说,我正在尝试在主映射函数中映射(转换)JavaRDD 对象,Apache Spark 怎么可能?

主要的JavaPairRDD对象(TextFile和Word都是定义类):

JavaPairRDD<TextFile, JavaRDD<Word>> filesWithWords = new...

和地图功能:

filesWithWords.map(textFileJavaRDDTuple2 -> textFileJavaRDDTuple2._2().map(word -> new Word(word.getText(), (long) textFileJavaRDDTuple2._1().getText().split(word.getText()).length)));

我也尝试了 foreach 而不是 map 功能,但不起作用。 (当然也搜索过 SPARK-5063)

【问题讨论】:

另见:***.com/questions/29815878/… 【参考方案1】:

与不支持对 RDD 的嵌套操作一样,Spark 中也不支持嵌套 RDD 类型。 RDD 仅在驱动程序中定义,结合它们的SparkContext,它们可以调度对它们所代表的数据的操作。

所以,在这种情况下,我们需要解决的根本原因是数据类型:

JavaPairRDD<TextFile, JavaRDD<Word>> filesWithWords

在 Spark 中没有可能的有效用途。根据问题中未进一步解释的用例,此类型应成为以下类型之一:

RDD 的集合,以及它们所引用的文本文件:

Map<TextFile,RDD<Word>>

或者由文本文件组成的(textFile,Word)集合:

JavaPairRDD<TextFile, Word>

或者是一组单词及其对应的TextFile:

JavaPairRDD<TextFile, List<Word>>

一旦改正了类型,嵌套RDD操作的问题自然就迎刃而解了。

【讨论】:

非常感谢您的回答。我尝试像这样使用 Map: Map> textMap = filesWithWords.collectAsMap(); textMap.forEach((textFile, wordJavaRDD) -> wordJavaRDD.map(word -> /* 一些转换*/ ));但再次返回相同的错误。 @Alp collectAsMap() 不会给你回Map&lt;TextFile, JavaRDD&lt;Word&gt;&gt; 。顺便说一句,你想做什么?对我来说,看起来你正在尝试的构造是相当做作的。 collectAsMap() 提供地图 (java.util.Map),我已经检查过了。我想你的意思是scala.collection.MapTextFile 类具有任何文件的 PathText 属性,Word 类具有 word及其 count 我正在尝试计算当前文本文件中的每个单词。此外JavaRDD&lt;Word&gt; 包含所有文本文件中使用的所有单词,不仅是当前文件,所以我不能使用简单的 wordCount 示例 @Alp 是字数统计的扩展,而是计数(文件,字)对。 为什么不呢?也许您可以使用您正在尝试的代码发布另一个问题。该方法应该有效,因此可能与您的代码有关。【参考方案2】:

当我在 Spark 的学习曲线中达到完全相同的点时(尝试使用嵌套 RDD 并失败),我切换到 DataFrames 并能够使用连接来完成同样的事情。此外,一般而言,DataFrame 的速度几乎是 RDD 的两倍——至少对于我一直在做的工作而言是这样。

【讨论】:

【参考方案3】:

@maasg 首先,我使用了 JavaPairRDD >,但它并没有像你和@David Griffin 所说的那样工作,现在还不可能。 型号:

TextFile(字符串路径,字符串文本)

单词(字符串单词,整数)

现在使用 JavaRDD 并且模型已更改为:

TextFile(String path, String text, List wordList)

单词(字符串单词,整数)

最后,

List<Word> countDrafts = wordCount.map(v11 -> new Word(v11._1(), (long) 0)).collect();
JavaRDD<TextFile> ft = fileTexts.map(v11 -> new TextFile(v11._1(), v11._2(), countDrafts));
ft.foreach(textFile -> textFile.getWordList().forEach(word -> new  Word(word.getText(), getWordCountFromText(textFile.getText(), word.getText())))); 

getWordCountFromText() 函数计算 TextFile 对象文本中的单词,但不幸的是没有使用 spark reduce 方法,使用经典方法。

顺便说一句,我将在接下来的几天尝试 DataFrames,但我的时间很短。

谢谢大家。

【讨论】:

您可能希望使用上述信息编辑您的帖子并删除此答案,而不是写一个答案来回应 maasg 和 David 的答案。 @MikelUrkia ***.com/questions/29996427/… @maasg 说的是你可能想用你正在尝试的代码写一个新的 question - 而不是你现有问题的答案 - 主要是因为 您在他的回答中发现的问题与此问题中发布的问题不同。这样就更容易回答你的问题了。 这没什么大不了的。对于面临同样问题的开发人员,我只是对我的问题的明确回答 也许你是对的,@Alp。你给你的问题一个答案,这是真的。只是想帮助您解决您的新问题。干杯!

以上是关于如何解决嵌套地图函数中的 SPARK-5063的主要内容,如果未能解决你的问题,请参考以下文章

在嵌套和使用来自 tidyverse 的地图之后,心理包中的所有描述性结果如何取消嵌套?

如何在Scala中访问嵌套映射中的键值

如何使用 reactjs 和 graphql 在 gatsby 中映射嵌套数组

如何在主代理的地图上放置嵌套代理

如何解析嵌套的 JSON 字典(地图)

Scala如何展平嵌套的地图[字符串,任何]