示例 Spark 程序

Posted

技术标签:

【中文标题】示例 Spark 程序【英文标题】:Sample Spark Program 【发布时间】:2016-06-09 11:12:30 【问题描述】:

您好,我正在学习 Spark 和 Scala,我有一个场景需要编写 Sparkscala 代码

输入文件

Name  attr1 attr2 attr3  
John    Y     N    N  
Smith   N     Y    N

预期输出

John  attr1 Y  
John  attr2 N  
John  attr3 N  
Smith attr1 N  
...  
...

我知道如何在 Map-Reduce 中做到这一点

对于每一行,分别获取名称并遍历 attr 值并将输出作为(Name, attrX Y/N) 发送,但在 scala 和 Spark 中有点混乱,谁能帮助我?

【问题讨论】:

在 Spark 之外,阅读第一行,提取 attrN 的列表,现在使用 Spark,将其设置为广播变量以使其可供工作人员访问。从文件的其余部分制作一个 RDD。 flatMap 在 RDD 上生成给定行的条目。你就完成了。 另外,我认为你的例子是错误的。 "Smith attr2 N" 应该是 "Smith attr1 N" 或 Smith attr2 Y" 【参考方案1】:

假设你已经知道输入属性的个数,输入属性之间用\t分隔,那么你可以这样做:

在 Java 中

// load data file
JavaRDD<String> file = jsc.textFile(path);

// build header rdd
JavaRDD<String> header = jsc.parallelize(Arrays.asList(file.first()));

// subtract header to have real data
JavaRDD<String> data = file.subtract(header);

// create row rdd
JavaRDD<Row> rowRDD = data.flatMap(new FlatMapFunction<String,Row>()
    private static final long serialVersionUID = 1L;

    @Override
    public Iterable<Row> call(String line) throws Exception 
        String[] strs = line.split("\t");
        Row r1 = RowFactory.create(strs[0], "Attr1", strs[1]);
        Row r2 = RowFactory.create(strs[0], "Attr2", strs[2]);
        Row r3 = RowFactory.create(strs[0], "Attr3", strs[3]);
        return Arrays.asList(r1,r2,r3);
    
);

// schema for df
StructType schema = new StructType().add("Name", DataTypes.StringType)
                                    .add("Attr", DataTypes.StringType)
                                    .add("Value", DataTypes.StringType);

DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();

这是输出:

+-----+-----+-----+
| Name| Attr|Value|
+-----+-----+-----+
|Smith|Attr1|    N|
|Smith|Attr2|    Y|
|Smith|Attr3|    N|
| John|Attr1|    Y|
| John|Attr2|    N|
| John|Attr3|    N|
+-----+-----+-----+

Scala 和 Java 很相似,您可以轻松地将它们翻译成 Scala。

【讨论】:

以上是关于示例 Spark 程序的主要内容,如果未能解决你的问题,请参考以下文章

来自示例 Java 程序的 Spark UDF 反序列化错误

使用 Apache Spark 提交 Python 应用程序

配置 spark 应用程序参数的最佳策略是啥?

Hadoop 之 Spark 安装配置与示例

Spark程序进行单元测试-使用scala

Spark 驱动程序堆内存问题