Spark多语言开发
Posted 赵广陆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark多语言开发相关的知识,希望对你有一定的参考价值。
目录
1 多语言开发-说明
2 Java-Spark-掌握
https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples
2.1 SparkCore
package cn.itcast.hello;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* Author itcast
* Desc 演示使用Java语言开发SparkCore完成WordCount
*/
public class JavaSparkDemo01 {
public static void main(String[] args) {
//0.TODO 准备环境
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkDemo").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel("WARN");
//1.TODO 加载数据
JavaRDD<String> fileRDD = jsc.textFile("data/input/words.txt");
//2.TODO 处理数据-WordCount
//切割
/*
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
*/
//注意:java的函数/lambda表达式的语法:
// (参数列表)->{函数体}
JavaRDD<String> wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
//每个单词记为1
JavaPairRDD<String, Integer> wordAndOneRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
//分组聚合
JavaPairRDD<String, Integer> wordAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);
//3.TODO 输出结果
List<Tuple2<String, Integer>> result = wordAndCountRDD.collect();
//result.forEach(t-> System.out.println(t));
result.forEach(System.out::println);//方法引用/就是方法转为了函数
//4.TODO 关闭资源
jsc.stop();
}
}
2.2 SparkStreaming
package cn.itcast.hello;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
/**
* Author itcast
* Desc 演示使用Java语言开发SparkStreaming完成WordCount
*/
public class JavaSparkDemo02 {
public static void main(String[] args) throws InterruptedException {
//0.TODO 准备环境
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkDemo").setMaster("local[*]");
//JavaSparkContext jsc = new JavaSparkContext(sparkConf);
//jsc.setLogLevel("WARN");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
jssc.sparkContext().setLogLevel("WARN");
//1.TODO 加载数据
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("node1", 9999);
//2.TODO 处理数据-WordCount
JavaPairDStream<String, Integer> result = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
//3.TODO 输出结果
result.print();
//4.TODO 启动并等待停止
jssc.start();
jssc.awaitTermination();
//4.TODO 关闭资源
jssc.stop();
}
}
2.3 SparkSQL
package cn.itcast.hello;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import static org.apache.spark.sql.functions.col;
/**
* Author itcast
* Desc 演示使用Java语言开发SparkSQL完成WordCount
*/
public class JavaSparkDemo03 {
public static void main(String[] args) {
//0.TODO 准备环境
SparkSession spark = SparkSession.builder().appName("JavaSparkDemo").master("local[*]").getOrCreate();
spark.sparkContext().setLogLevel("WARN");
//1.TODO 加载数据
Dataset<String> ds = spark.read().textFile("data/input/words.txt");
//2.TODO 处理数据-WordCount
Dataset<String> wordsDS = ds.flatMap((String line) -> Arrays.asList(line.split(" ")).iterator(), Encoders.STRING());
//TODO ====SQL
wordsDS.createOrReplaceTempView("t_word");
String sql = "select value, count(*) as counts " +
"from t_word " +
"group by value " +
"order by counts desc";
spark.sql(sql).show();
//TODO ====DSL
/*Dataset<Row> temp = wordsDS.groupBy("value")
.count();
temp.orderBy(temp.col("count").desc()).show();*/
wordsDS.groupBy("value")
.count()
//.orderBy($"count".desc()).show();
.orderBy(col("count").desc()).show();
//3.TODO 输出结果
//4.TODO 关闭资源
spark.stop();
}
}
2.4 StructuredStreaming
package cn.itcast.hello;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import static org.apache.spark.sql.functions.col;
/**
* Author itcast
* Desc 演示使用Java语言开发StructuredStreaming完成WordCount
*/
public class JavaSparkDemo04 {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
//0.TODO 准备环境
SparkSession spark = SparkSession.builder().appName("JavaSparkDemo").master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
//1.TODO 加载数据
Dataset<Row> lines = spark.readStream()
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load();
//2.TODO 处理数据-WordCount
Dataset<String> ds = lines.as(Encoders.STRING());
Dataset<String> wordsDS = ds.flatMap((String line) -> Arrays.asList(line.split(" ")).iterator(), Encoders.STRING());
//TODO ====SQL
wordsDS.createOrReplaceTempView("t_word");
String sql = "select value, count(*) as counts " +
"from t_word " +
"group by value " +
"order by counts desc";
Dataset<Row> result1 = spark.sql(sql);
//TODO ====DSL
Dataset<Row> result2 = wordsDS.groupBy("value")
.count()
.orderBy(col("count").desc());
//3.TODO 输出结果
result1.writeStream()
.format("console")
.outputMode(OutputMode.Complete())
.start();
/*.awaitTermination()*/
result2.writeStream()
.format("console")
.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
//4.TODO 关闭资源
spark.stop();
}
}
2.5 线性回归算法-房价预测案例
2.5.1 需求
特征列:
|房屋编号mlsNum|城市city|平方英尺|卧室数bedrooms|卫生间数bathrooms|车库garage|年龄age|房屋占地面积acres|
标签列:
房屋价格price
代码实现
0.准备环境
1.加载数据
2.特征处理
3.数据集划分0.8训练集/0.2测试集
4.使用训练集训练线性回归模型
5.使用测试集对模型进行测试
6.计算误差rmse均方误差
7.模型保存(save)方便后续使用(load)
8.关闭资源
package cn.itcast.hello;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Author itcast
* Desc 演示使用Java语言开发SparkMlLib-线性回归算法-房价预测案例
*/
public class JavaSparkDemo05 {
public static void main(String[] args) throws TimeoutException, StreamingQueryException, IOException {
//0.TODO 准备环境
SparkSession spark = SparkSession.builder().appName("JavaSparkDemo").master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
//TODO 1.加载数据
Dataset<Row> homeDataDF = spark.read()
.format("csv")
.option("sep", "|")//指定分隔符
.option("header", "true")//是否有表头
.option("inferSchema", "true")//是否自动推断约束
.load("data/input/homeprice.data");
homeDataDF.printSchema();
homeDataDF.show();
/*
root
|-- mlsNum: integer (nullable = true)
|-- city: string (nullable = true)
|-- sqFt: double (nullable = true)
|-- bedrooms: integer (nullable = true)
|-- bathrooms: integer (nullable = true)
|-- garage: integer (nullable = true)
|-- age: integer (nullable = true)
|-- acres: double (nullable = true)
|-- price: double (nullable = true)
//|房屋编号|城市|平方英尺|卧室数|卫生间数|车库|年龄|房屋占地面积|房屋价格
+-------+------------+-------+--------+---------+------+---+-----+---------+
| mlsNum| city| sqFt|bedrooms|bathrooms|garage|age|acres| price|
+-------+------------+-------+--------+---------+------+---+-----+---------+
|4424109|Apple Valley| 1634.0| 2| 2| 2| 33| 0.04| 119900.0|
|4404211| Rosemount|13837.0| 4| 6| 4| 17|14.46|3500000.0|
|4339082| Burnsville| 9040.0| 4| 6| 8| 12| 0.74|2690000.0|
*/
//TODO 2.特征处理
//特征选择
Dataset<Row> featuredDF = homeDataDF.select("sqFt", "age", "acres", "price");
//特征向量化
Dataset<Row> vectorDF = new VectorAssembler()
.setInputCols(new String[]{"sqFt", "age", "acres"})//指定要对哪些特征做向量化
.setOutputCol("features")//向量化之后的特征列列名
.transform(featuredDF);
vectorDF.printSchema();
vectorDF.show();
/*
root
|-- sqFt: double (nullable = true)
|-- age: integer (nullable = true)
|-- acres: double (nullable = true)
|-- price: double (nullable = true)
|-- features: vector (nullable = true)
+-------+---+-----+---------+--------------------+
| sqFt|age|acres| price| features|
+-------+---+-----+---------+--------------------+
| 1634.0| 33| 0.04| 119900.0| [1634.0,33.0,0.04]|
|13837.0| 17|14.46|3500000.0|[13837.0,17.0,14.46]|
| 9040.0| 12| 0.74|2690000.0| [9040.0,12.0,0.74]|
*/
//TODO 3.数据集划分0.8训练集/0.2测试集
Dataset<Row>[] arr = vectorDF.randomSplit(new double[]{0.8, 0.2}, 100);
Dataset<Row> trainSet = arr[0];
Dataset<RowWPF中的多语言[关闭]