如何一次处理多个 JavaRDD?

Posted

技术标签:

【中文标题】如何一次处理多个 JavaRDD?【英文标题】:How to process several JavaRDDs all at once? 【发布时间】:2016-12-20 06:53:25 【问题描述】:

我有一个格式为 csv 的大型数据集,我需要在不使用任何 DataFrames/Dataset API 和 SparkSQL 的情况下对该数据集执行一些 RDD 操作。通过实现这一点,我将每列数据加载到单独的 JavaRDD 中。

这是我的示例数据集:

id    name    address   rank
1001  john    NY        68
1002  kevin   NZ        72
1003  steve   WA        64

这是我目前尝试的代码:

JavaRDD<String> diskfile = sc.textFile("/Users/hadoop/Downloads/a.csv");
JavaRDD<String> idRDD=diskfile.flatMap(line -> Arrays.asList(line.split(",")[0]));
JavaRDD<String> nameRDD=diskfile.flatMap(line -> Arrays.asList(line.split(",")[1]));
JavaRDD<String> addressRDD=diskfile.flatMap(line -> Arrays.asList(line.split(",")[2]));

在此之后,我在 addressRDDnameRDD 上都应用了 reduceByKey,如下所示:

JavaPairRDD<String,Integer> addresspair=address.mapToPair( t -> new Tuple2 <String,Integer>(t,1)).reduceByKey((x, y) -> x + y);
JavaPairRDD<String,Integer> namepair=nameRDD.mapToPair( t -> new Tuple2 <String,Integer>(t,1)).reduceByKey((x, y) -> x + y);

问题:

我在地址对上应用了 soryByVale(交换键值)并得到一个地址值(result),它出现的次数最多。现在我需要返回包含地址字段为result的csv文件的所有必需列。

【问题讨论】:

【参考方案1】:

您可以使用filter,如下所示。

JavaRDD<String> filteredData = diskfile.filter(add -> add.contains(result));
filteredData.foreach(data -> 
            System.out.println(data);
        );

【讨论】:

感谢您的回复。它打印整行,即此处的数据。但我需要只打印选定的列。 您要选择哪些列? id、地址和排名列 您实际上应该使用 Dataframe 围绕列进行操作。对于这种情况,要从行中删除 addrss 字符串,您可以使用 JavaRDD filteredData = diskfile.filter(add -> add.contains(result)).map(line -> line.replace(","+result, ""));

以上是关于如何一次处理多个 JavaRDD?的主要内容,如果未能解决你的问题,请参考以下文章

Spark学习之JavaRdd

从文字值创建 DataFrame 和 JavaRDD

数据集 javaRDD() 性能

根据索引 id 的 ArrayList 过滤 JavaRDD

如何将多个文本文件读入单个 RDD?

将 JavaDStream<String> 转换为 JavaRDD<String>