[DB] Spark--Spark Core

Posted cxc1357

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[DB] Spark--Spark Core相关的知识,希望对你有一定的参考价值。

生态

  • Spark Core:最重要,其中最重要的是RDD(弹性分布式数据集)
  • Spark SQL
  • Spark Streaming
  • Spark MLLib:机器学习算法
  • Spark Graphx:图计算

特点

  • 针对大规模数据处理的快速通用引擎
  • 基于内存计算
  • 速度快,易用,兼容性强

体系架构

  • 主节点:Cluster Manager(Standalone时叫Master)
  • 从节点:Worker(占用节点上所有资源,耗内存,没用内存管理机制,易OOM)

安装部署

  • 安装jdk,配置主机名,配置免密码登录
  • 伪分布(Standalone):一台机器上模拟分布式环境(Master+Worker)
    • 核心配置文件:conf/spark-env.sh
      • cp spark-env.sh.template spark-env.sh
      • export JAVA_HOME=/root/training/jdk1.8.0_144
      • export SPARK_MASTER_HOST=bigdata111
      • export SPARK_MASTER_PORT=7077
    • 启动:sbin/start-all.sh
    • Web Console:http://192.168.174.111:8080/
  • 全分布:先在主节点上安装,再把装好的目录复制到从节点上 
    • scp -r spark-2.1.0-bin-hadoop2.7/ root@bigdata114:/root/training
    • 在主节点上启动集群

HA

  • 基于文件目录
    • 本质还是只有一个主节点
    • 创建恢复目录保存状态信息
    • 主要用于开发和测试
    • mkdir /root/training/spark-2.1.0-bin-hadoop2.7/recovery
    • spark-env.sh
    • export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"

  • 基于zookeeper
    • 用于生产环境
    • 相当于数据库
    • 数据同步,选举功能,分布式锁(秒杀)
    • 步骤
      • 设置时间同步
      • date -s 2020-06-03
      • 启动zk
      • 配置spark-env.sh,注释掉最后两行,添加:
      • export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata112:2181,bigdata113:2181,bigdata114:2181 -Dspark.deploy.zookeeper.dir=/spark"
      • bigdata112上启动spark集群后,在bigdata114上启动Master

  

工具

  • spark-submit:用于提交Spark任务(jar包) 
    • bin/spark-submit --master spark://bigdata111:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100
  • spark-shell:相当于REPL,命令行工具 
    • 本地模式
      • bin/spark-shell
      • 不需连接到Spark集群上,在本地(Eclipse)直接运行,用于开发和测试
    • 集群模式
      • bin/spark-shell --master spark://bigdata111:7077
      • WordCount
        • sc.textFile("/root/temp/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

        •  sc.textFile("hdfs://bigdata111:9000/input/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)saveAsTextFile("hdfs://bigdata111:9000/output/1025")

        • val rdd1 = sc.textFile("/root/temp/input/data.txt") 
        • val rdd2 = rdd1.flatMap(_.split(" ")) 
        • val rdd3 = rdd2.map((_,1)) 【完整:val rdd3 = rdd2.map((word:String)=>(word,1) )】
        • val rdd4 = rdd3.reduceByKey(_+_)【完整:val rdd4 = rdd3.reduceByKey((a:Int,b:Int)=> a+b)】
        • rdd4.collect

IDE开发WordCount

  • Scala版本
    • 本地模式
 1 package day0605
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.SparkContext
 5 
 6 object MyWordCount {
 7   def main(args:Array[String]):Unit = {
 8     //创建一个任务的配置信息
 9     //设置Master=local,表示运行在本地模式上
10     //集群模式不需设置Master
11     val conf = new SparkConf().setAppName("MyWordCount").setMaster("local")
12     
13     //创建一个SparkContext对象
14     val sc = new SparkContext(conf)
15     
16     //执行WordCount
17     val result = sc.textFile("hdfs://192.168.174.111:9000/input/data.txt")
18     .flatMap(_.split(" ")).map((_,1))
19     .reduceByKey(_+_).collect
20     
21     //打印结果
22     result.foreach(println)
23     
24     //停止SparkContext
25     sc.stop()
26   }
27 }
View Code

    • 集群模式
      • bin/spark-submit --master spark://bigdata111:7077 --class day0605.MyWordCount /root/temp/demo1.jar hdfs://bigdata111:9000/input/data.txt hdfs://bigdata111:9000/output/0605/demo1
 1 package day0605
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.SparkContext
 5 
 6 //通过spark-submit提交
 7 
 8 object MyWordCount {
 9   def main(args:Array[String]):Unit = {
10     //创建一个任务的配置信息
11     //设置Master=local,表示运行在本地模式上
12     //集群模式不需设置Master
13     val conf = new SparkConf().setAppName("MyWordCount")
14     
15     //创建一个SparkContext对象
16     val sc = new SparkContext(conf)
17     
18     //执行WordCount
19     val result = sc.textFile(args(0))
20     .flatMap(_.split(" "))
21     .map((_,1))
22     .reduceByKey(_+_)
23     
24     //输出到hdfs
25     result.saveAsTextFile(args(1))
26     
27     //停止SparkContext
28     sc.stop()
29   }
30 }
View Code

  • Java版本
 1 package demo;
 2 
 3 import java.util.Arrays;
 4 import java.util.Iterator;
 5 import java.util.List;
 6 
 7 import org.apache.spark.SparkConf;
 8 import org.apache.spark.api.java.JavaPairRDD;
 9 import org.apache.spark.api.java.JavaRDD;
10 import org.apache.spark.api.java.JavaSparkContext;
11 import org.apache.spark.api.java.function.FlatMapFunction;
12 import org.apache.spark.api.java.function.Function2;
13 import org.apache.spark.api.java.function.PairFunction;
14 
15 import scala.Tuple2;
16 
17 /*
18  * 使用spark submit提交
19  * bin/spark-submit --master spark://bigdata111:7077 --class demo.JavaWordCount /root/temp/demo2.jar hdfs://bigdata111:9000/input/data.txt
20  */
21 
22 public class JavaWordCount {
23 
24     public static void main(String[] args) {
25         //运行在本地模式,可以设置断点
26         SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
27         
28         //运行在集群模式
29         //SparkConf conf = new SparkConf().setAppName("JavaWordCount");
30         
31         //创建一个SparkContext对象: JavaSparkContext对象
32         JavaSparkContext sc = new JavaSparkContext(conf);
33         
34         //读入HDFS的数据
35         JavaRDD<String> rdd1 = sc.textFile(args[0]);
36         
37         /*
38          * 分词
39          * FlatMapFunction:接口,用于处理分词的操作
40          * 泛型:String 读入的每一句话
41          *     U:      返回值 ---> String 单词
42          */
43         JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
44 
45             @Override
46             public Iterator<String> call(String input) throws Exception {
47                 //数据: I love Beijing
48                 //分词
49                 return Arrays.asList(input.split(" ")).iterator();
50             }
51         });
52         
53         /*
54          * 每个单词记一次数  (k2  v2)
55          * Beijing ---> (Beijing,1)
56          * 参数:
57          * String:单词
58          * k2 v2不解释
59          */
60         JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
61 
62             @Override
63             public Tuple2<String, Integer> call(String word) throws Exception {
64                 return new Tuple2<String, Integer>(word, 1);
65             }
66             
67         });
68         
69         //执行Reduce的操作
70         JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
71             
72             @Override
73             public Integer call(Integer a, Integer b) throws Exception {
74                 //累加
75                 return a+b;
76             }
77         });
78         
79         //执行计算(Action),把结果打印在屏幕上
80         List<Tuple2<String,Integer>> result = rdd4.collect();
81         
82         for(Tuple2<String,Integer> tuple:result){
83             System.out.println(tuple._1+"\\t"+tuple._2);
84         }
85         
86         //停止JavaSparkContext对象
87         sc.stop();
88     }
89 }
View Code

 

参考

spark.apache.org

spark任务提交两种方式

https://www.cnblogs.com/LHWorldBlog/p/8414342.html

以上是关于[DB] Spark--Spark Core的主要内容,如果未能解决你的问题,请参考以下文章

大数据之Spark:Spark Core 调优之数据倾斜调优

sbt.librarymanagement.ResolveException:下载 org.apache.spark 时出错:spark-core:3.0.1

Spark-core:Spark RDD的高级算子

大数据之Spark:Spark Core

sbt 在运行 Spark hello world 代码时出错?

spark执行例子eclipse maven打包jar