spark学习总结
Posted 花心土豆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark学习总结相关的知识,希望对你有一定的参考价值。
Spark总结
Spark Engine
RDD
弹性分布式数据集
partitons组成的,partition一定是一个具体的概念,就是一段连续的数据在某个物理节点
1,由一组partitions组成
2,应用在RDD上面的算子,会被应用到每一个partitions上面去
3,每一个RDD需要有依赖
4,如果RDD是k,v键值对的,就可以有一些重新partition的功能,比如说有些算子,groupByKey,reduceByKey,countByKey
5,有些RDD有最佳计算位置,比如HadoopRDD,反例比如是本地集合演化过来的RDD,那没有最佳计算位置(数据本地性)
算子操作
Transformations
map、mapPartition、flatMap、reduceByKey、groupByKey、filter、sortByKey、mapValues、sample ...
本质就是生成新的RDD,new MapPartitionsRDD()
Actions
collect慎用、reduce、count、take、foreach、foreachPartition...
本质会提交一个JOB去集群里面去运算,sc.runJob()
RDD容错
1,血统,Lineage,重算!! 重算会找依赖的RDD,如果一直都没有过持久化,重新从数据源来读取数据
2,cache() persist() 默认的持久化策略 MEMORY_ONLY,存不下就不存了,下次重新算 _2 _SER
着重要区别开的就是MEMORY_AND_DISK,这个东西是存不下就存在本地磁盘
OFF_HEAP,默认会去找Tachyon
3,checkpoint
做checkpoint需要首先在sc.setCheckpointDir("hdfs://") 存在分布式文件系统里面!
Spark Cluster --> Worker Nodes --> Executors --> Threads
Yarn Cluster --> Node Managers --> Containers --> Threads
ApplicationMaster是这个Driver驱动程序和ResourceManager沟通的中间人或者桥梁
Application(Driver DAGScheduler TaskScheduler) --> Jobs(Action操作) --> Stages(宽窄依赖/Shuffle) -->
Tasks(Pipeline/看一个Stage里面最后一个finalRDD上面有几个Partitions,其实就有Tasks被划分出来)
DAGScheduler会划分JOB到Tasks,DAGScheduler会计算每一个Task的最佳计算位置,它是倒着往前来推的,也就是推到
pipeline一条线最前面一个RDD,如果这条线上没有做过持久化,最前面一个RDD如果譬如是HadoopRDD,
那么最近位置就是由Block所在的位置决定,如果做过持久化,那么最近计算位置,就是做persist的位置!
最后如果没有持久化,如果没有Block位置,那么就没有最佳位置,那么Task就会扔到资源列表里面的一个空闲的Executor里面,
数据就是走网络传输!
TaskScheduler在初始化的时候会申请到一堆Executors/Containers,TaskScheduler就接收DAGScheduler发送过来的TaskSet(对应一个Stage)
说白了,就是发送Stage的顺序是DAGScheduler来决定的,TaskScheduler会把TaskSet里面的task抽出来一个个的发送到从节点里面去执行,
真正到从节点里面,才会开始读取数据!!!!
Tasks在从节点运行完了之后会把Results返回给Driver,所以这个地方也就是Driver在哪里,去哪里看结果
Standalone --deploy client cluster的区别
Yarn --master yarn-client yarn-cluster的区别
Spark Core
new SparkContext(conf)
算子操作
TopN
GroupTopN (Collection.sort(list) 插入排序)
二次排序 (构建自定义的key)
PageRank (需要注意的是,迭代次数多了,DAG很复杂,可以对每次迭代的RDD进行checkpoint)
SparkPi
Spark SQL
new SQLContext/HiveContext()
DataFrame 里面除了有数据还有schema
RDD 转成 DataFrame
1,反射 JavaBean
2,动态的方式, 需要去构建StructFiled StructType
几种数据源
JSON
mysql spark-default.conf
Hive (注意的就是运行代码的时候,如果是yarn-cluster模式,需要--jars 3个jar包)
开窗函数里面的row_number()打行号
可以通过 row_number() OVER (PARTITION BY ... ORDER BY ... DESC) rank where rank <=3
来做到分组去Top3
自定义UDF 和 UDAF
udf定义 sqlconext.udf.register("", => )
UDF是多个元素进来,多个元素出去
UDAF是多个元素进来,一个元素出去
Spark Streaming
val ssc = new StreamingContext(conf)
ssc.start()
ssc.awaitTermination()
本质是微批处理,其实就是每隔一个间隔切割一个RDD,然后一个RDD提交一个JOB,本质还是用咱们的Spark Engine来处理
读取端口数据,socketTextStream(),用到Reciver的机制,会占用额外的线程
读取HDFS数据,textFileStream(),没有用Reciver,本质就是每隔一个间隔去读取Block数据
读取Kafka数据,
1,基于Receiver,KafkaUtils.createStream()
2,Direct方式,没有Receiver,KafkaUtils.createDirectStream()
Direct方式好处
1,one to one mapping,说白了就是 kafka里面partitons有几个,这边到Spark里面的RDD就对应几个
2,高效,这样的话就不许WAL,预习日志,就不需要额外的Disk IO
3,Exactly-once,计算一次尽计算一次,不多不少
SparkStreaming里面和SparkCore里面相比,比较有特点的三个transform算子操作
1,updateStateByKey()
注意的是,需要ssc.checkpoint("hdfs://"),会不断的往之前的状态上更新
2,transform()
特点是交给你一个个的RDD,咱们可以直接用Spark Core里面的所有算子操作!最后给它返回一个个RDD再往下游传递就可以
3,基于window的操作
注意要设置三个时间,第一个还是每隔多少时间切割一个RDD,第二个是每隔多少时间计算一次(slideDuration),第三个是每次计算多少的数据量(windowDuration)
我们举了一个例子就是reduceByKeyAndWindow()
这个地方有两个api,
reduceByKeyAndWindow(_+_, windowDuration,slideDuration)
有一个是优化的,这个需要首先设置checkpoint
reduceByKeyAndWindow(_+_,_-_, windowDuration,slideDuration)
并行度的计算
一开始读数据
sc.parallize(指定参数--多少partition)
sc.textFile(指定参数--至少多少partiton)
计算过程中,我们可以使用repartition或者colease算子来改变数据
计算过程中,我们可以通过譬如groupByKey([numTasks])、reduceByKey([numTasks])指定这次shuffle的reduce端reduce tasks的数据
还有在conf.set("spark.default.parallism", 100)
优先级
groupByKey([numTasks]) > conf.set("spark.default.parallism", 100)
如果么有设置conf.set("spark.default.parallism", 100),就根据上一次RDD里面的并行度来
*spark memory的使用
以上是关于spark学习总结的主要内容,如果未能解决你的问题,请参考以下文章