[DB] Spark--Spark Core
Posted cxc1357
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[DB] Spark--Spark Core相关的知识,希望对你有一定的参考价值。
生态
- Spark Core:最重要,其中最重要的是RDD(弹性分布式数据集)
- Spark SQL
- Spark Streaming
- Spark MLLib:机器学习算法
- Spark Graphx:图计算
特点
- 针对大规模数据处理的快速通用引擎
- 基于内存计算
- 速度快,易用,兼容性强
体系架构
- 主节点:Cluster Manager(Standalone时叫Master)
- 从节点:Worker(占用节点上所有资源,耗内存,没用内存管理机制,易OOM)
安装部署
- 安装jdk,配置主机名,配置免密码登录
- 伪分布(Standalone):一台机器上模拟分布式环境(Master+Worker)
- 核心配置文件:conf/spark-env.sh
- cp spark-env.sh.template spark-env.sh
- export JAVA_HOME=/root/training/jdk1.8.0_144
- export SPARK_MASTER_HOST=bigdata111
- export SPARK_MASTER_PORT=7077
- 启动:sbin/start-all.sh
- Web Console:http://192.168.174.111:8080/
- 核心配置文件:conf/spark-env.sh
- 全分布:先在主节点上安装,再把装好的目录复制到从节点上
- scp -r spark-2.1.0-bin-hadoop2.7/ root@bigdata114:/root/training
- 在主节点上启动集群
HA
- 基于文件目录
- 本质还是只有一个主节点
- 创建恢复目录保存状态信息
- 主要用于开发和测试
- mkdir /root/training/spark-2.1.0-bin-hadoop2.7/recovery
- spark-env.sh
- export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"
- 基于zookeeper
- 用于生产环境
- 相当于数据库
- 数据同步,选举功能,分布式锁(秒杀)
- 步骤
- 设置时间同步
- date -s 2020-06-03
- 启动zk
- 配置spark-env.sh,注释掉最后两行,添加:
- export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata112:2181,bigdata113:2181,bigdata114:2181 -Dspark.deploy.zookeeper.dir=/spark"
- bigdata112上启动spark集群后,在bigdata114上启动Master
工具
- spark-submit:用于提交Spark任务(jar包)
- bin/spark-submit --master spark://bigdata111:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100
- spark-shell:相当于REPL,命令行工具
- 本地模式
- bin/spark-shell
- 不需连接到Spark集群上,在本地(Eclipse)直接运行,用于开发和测试
- 集群模式
- bin/spark-shell --master spark://bigdata111:7077
- WordCount
- sc.textFile("/root/temp/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
- 本地模式
-
-
-
- sc.textFile("hdfs://bigdata111:9000/input/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)saveAsTextFile("hdfs://bigdata111:9000/output/1025")
-
-
-
-
-
- val rdd1 = sc.textFile("/root/temp/input/data.txt")
- val rdd2 = rdd1.flatMap(_.split(" "))
- val rdd3 = rdd2.map((_,1)) 【完整:val rdd3 = rdd2.map((word:String)=>(word,1) )】
- val rdd4 = rdd3.reduceByKey(_+_)【完整:val rdd4 = rdd3.reduceByKey((a:Int,b:Int)=> a+b)】
- rdd4.collect
-
-
IDE开发WordCount
- Scala版本
- 本地模式
1 package day0605 2 3 import org.apache.spark.SparkConf 4 import org.apache.spark.SparkContext 5 6 object MyWordCount { 7 def main(args:Array[String]):Unit = { 8 //创建一个任务的配置信息 9 //设置Master=local,表示运行在本地模式上 10 //集群模式不需设置Master 11 val conf = new SparkConf().setAppName("MyWordCount").setMaster("local") 12 13 //创建一个SparkContext对象 14 val sc = new SparkContext(conf) 15 16 //执行WordCount 17 val result = sc.textFile("hdfs://192.168.174.111:9000/input/data.txt") 18 .flatMap(_.split(" ")).map((_,1)) 19 .reduceByKey(_+_).collect 20 21 //打印结果 22 result.foreach(println) 23 24 //停止SparkContext 25 sc.stop() 26 } 27 }
-
- 集群模式
- bin/spark-submit --master spark://bigdata111:7077 --class day0605.MyWordCount /root/temp/demo1.jar hdfs://bigdata111:9000/input/data.txt hdfs://bigdata111:9000/output/0605/demo1
- 集群模式
1 package day0605 2 3 import org.apache.spark.SparkConf 4 import org.apache.spark.SparkContext 5 6 //通过spark-submit提交 7 8 object MyWordCount { 9 def main(args:Array[String]):Unit = { 10 //创建一个任务的配置信息 11 //设置Master=local,表示运行在本地模式上 12 //集群模式不需设置Master 13 val conf = new SparkConf().setAppName("MyWordCount") 14 15 //创建一个SparkContext对象 16 val sc = new SparkContext(conf) 17 18 //执行WordCount 19 val result = sc.textFile(args(0)) 20 .flatMap(_.split(" ")) 21 .map((_,1)) 22 .reduceByKey(_+_) 23 24 //输出到hdfs 25 result.saveAsTextFile(args(1)) 26 27 //停止SparkContext 28 sc.stop() 29 } 30 }
- Java版本
1 package demo; 2 3 import java.util.Arrays; 4 import java.util.Iterator; 5 import java.util.List; 6 7 import org.apache.spark.SparkConf; 8 import org.apache.spark.api.java.JavaPairRDD; 9 import org.apache.spark.api.java.JavaRDD; 10 import org.apache.spark.api.java.JavaSparkContext; 11 import org.apache.spark.api.java.function.FlatMapFunction; 12 import org.apache.spark.api.java.function.Function2; 13 import org.apache.spark.api.java.function.PairFunction; 14 15 import scala.Tuple2; 16 17 /* 18 * 使用spark submit提交 19 * bin/spark-submit --master spark://bigdata111:7077 --class demo.JavaWordCount /root/temp/demo2.jar hdfs://bigdata111:9000/input/data.txt 20 */ 21 22 public class JavaWordCount { 23 24 public static void main(String[] args) { 25 //运行在本地模式,可以设置断点 26 SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); 27 28 //运行在集群模式 29 //SparkConf conf = new SparkConf().setAppName("JavaWordCount"); 30 31 //创建一个SparkContext对象: JavaSparkContext对象 32 JavaSparkContext sc = new JavaSparkContext(conf); 33 34 //读入HDFS的数据 35 JavaRDD<String> rdd1 = sc.textFile(args[0]); 36 37 /* 38 * 分词 39 * FlatMapFunction:接口,用于处理分词的操作 40 * 泛型:String 读入的每一句话 41 * U: 返回值 ---> String 单词 42 */ 43 JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { 44 45 @Override 46 public Iterator<String> call(String input) throws Exception { 47 //数据: I love Beijing 48 //分词 49 return Arrays.asList(input.split(" ")).iterator(); 50 } 51 }); 52 53 /* 54 * 每个单词记一次数 (k2 v2) 55 * Beijing ---> (Beijing,1) 56 * 参数: 57 * String:单词 58 * k2 v2不解释 59 */ 60 JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { 61 62 @Override 63 public Tuple2<String, Integer> call(String word) throws Exception { 64 return new Tuple2<String, Integer>(word, 1); 65 } 66 67 }); 68 69 //执行Reduce的操作 70 JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { 71 72 @Override 73 public Integer call(Integer a, Integer b) throws Exception { 74 //累加 75 return a+b; 76 } 77 }); 78 79 //执行计算(Action),把结果打印在屏幕上 80 List<Tuple2<String,Integer>> result = rdd4.collect(); 81 82 for(Tuple2<String,Integer> tuple:result){ 83 System.out.println(tuple._1+"\\t"+tuple._2); 84 } 85 86 //停止JavaSparkContext对象 87 sc.stop(); 88 } 89 }
参考
spark.apache.org
spark任务提交两种方式
https://www.cnblogs.com/LHWorldBlog/p/8414342.html
以上是关于[DB] Spark--Spark Core的主要内容,如果未能解决你的问题,请参考以下文章
大数据之Spark:Spark Core 调优之数据倾斜调优
sbt.librarymanagement.ResolveException:下载 org.apache.spark 时出错:spark-core:3.0.1