示例 Spark 程序
Posted
技术标签:
【中文标题】示例 Spark 程序【英文标题】:Sample Spark Program 【发布时间】:2016-06-09 11:12:30 【问题描述】:您好,我正在学习 Spark 和 Scala,我有一个场景需要编写 Sparkscala 代码
输入文件
Name attr1 attr2 attr3
John Y N N
Smith N Y N
预期输出
John attr1 Y
John attr2 N
John attr3 N
Smith attr1 N
...
...
我知道如何在 Map-Reduce 中做到这一点
对于每一行,分别获取名称并遍历 attr 值并将输出作为(Name, attrX Y/N)
发送,但在 scala 和 Spark 中有点混乱,谁能帮助我?
【问题讨论】:
在 Spark 之外,阅读第一行,提取 attrN 的列表,现在使用 Spark,将其设置为广播变量以使其可供工作人员访问。从文件的其余部分制作一个 RDD。 flatMap 在 RDD 上生成给定行的条目。你就完成了。 另外,我认为你的例子是错误的。 "Smith attr2 N" 应该是 "Smith attr1 N" 或 Smith attr2 Y" 【参考方案1】:假设你已经知道输入属性的个数,输入属性之间用\t
分隔,那么你可以这样做:
在 Java 中
// load data file
JavaRDD<String> file = jsc.textFile(path);
// build header rdd
JavaRDD<String> header = jsc.parallelize(Arrays.asList(file.first()));
// subtract header to have real data
JavaRDD<String> data = file.subtract(header);
// create row rdd
JavaRDD<Row> rowRDD = data.flatMap(new FlatMapFunction<String,Row>()
private static final long serialVersionUID = 1L;
@Override
public Iterable<Row> call(String line) throws Exception
String[] strs = line.split("\t");
Row r1 = RowFactory.create(strs[0], "Attr1", strs[1]);
Row r2 = RowFactory.create(strs[0], "Attr2", strs[2]);
Row r3 = RowFactory.create(strs[0], "Attr3", strs[3]);
return Arrays.asList(r1,r2,r3);
);
// schema for df
StructType schema = new StructType().add("Name", DataTypes.StringType)
.add("Attr", DataTypes.StringType)
.add("Value", DataTypes.StringType);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();
这是输出:
+-----+-----+-----+
| Name| Attr|Value|
+-----+-----+-----+
|Smith|Attr1| N|
|Smith|Attr2| Y|
|Smith|Attr3| N|
| John|Attr1| Y|
| John|Attr2| N|
| John|Attr3| N|
+-----+-----+-----+
Scala 和 Java 很相似,您可以轻松地将它们翻译成 Scala。
【讨论】:
以上是关于示例 Spark 程序的主要内容,如果未能解决你的问题,请参考以下文章
来自示例 Java 程序的 Spark UDF 反序列化错误