Spark学习笔记

Posted 準提童子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark学习笔记相关的知识,希望对你有一定的参考价值。

本文章根据《Spark 快速大数据分析/ Learning Spark: Lightning-fast Data Anakysis》一书整理。这篇文章的主要目标和特点:简要、重点、完成后可用于开发

1、Spark是什么

Spark是一个用来实现快速通用集群计算的平台。其一个主要的特点就是能够在内存中进行计算,因此速度更快。原先需要多种不同的分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理,Spark通过在一个统一的框架下支持这些不同的计算,实现有效的整合,减轻了原先对各种平台分别管理的负担。

Spark提供了基于Python,Java,Scala,SQL的简单易用的API,以及内建的丰富的程序库,其还可以与其他大数据工具密切配合使用,如Spark可以运行在Hadoop集群上。

Spark是一个大一统的软件栈,包含多个组件


Spark Core :实现Spark的基本功能:任务调度、内存管理、错误恢复、与存储系统交互等。Core 还包含了对弹性分布式数据集RDD(resilient distributed dataset)的API定义。

RDD表示分布在多个计算机节点上可以并行操作的元素集合,是Spark的主要编程抽象。

spark SQL:操作结构化数据的程序包。通过 Spark SQL,可以使用SQL或者HQL来查询数据,Spark SQL支持多种数据源,如Hive表、Parquet、以及JSON等,其还支持将SQL与传统的RDD编程的数据操作方式相结合。

Spark Streaming:对实时数据进行流失计算的组件。流数据的定义:只能以事先规定好的顺序被读取一次的数据的一个序列,特点是以非常高的速率到来的输入数据。

MLib:提供常见的机器学习ML功能的程序库,提供了很多机器学习算法,包括分类,回归、聚类、协同过滤等。

GraphX:用来操作图(数学用语,比如朋友关系图)的程序库,可以进行并行的图计算,提供一些常用的图算法。

Spark设计为可以高效的在一个计算节点到数千个计算节点之间伸缩计算,Spark支持在各种集权管理器cluster manager上运行,包括Hadoop YARN,Apache Mesos,以及Spark自带的简易调度器——独立调度器。

Spark的存储层次:Spark不仅可以将任何Hadoop分布式文件系统上的文件读取为分布式数据集,也可以支持其他支持Hadoop接口的系统,如Hive,HBase,Spark支持任何实现了Hadoop接口的存储系统。

2、Spark开发环境搭建

使用spark,可以通过shell,也可以通过搭建开发环境,因个人开发需要,主要面向java搭建Spark开发环境。暂时略过。

开发Spark程序实际上就是通过调用Spark的API,实现Java程序与Spark环境之间的交互:


Java驱动程序将需要计算的数据通过parallelize方法,将数据传给Spark环境,转换为RDD;

驱动程序通过调用各种算子Api,实现Spark环境对数据的计算;

最后,Spark环境将计算结果返回给Java驱动程序;


搭建Spark环境后,先创建Spark的环境对象SparkContext,需要传入两个参数:

集群URL:告诉Spark如何连接到集群上,使用"local"可以使spark运行在单机单线程上而无需连接到集群。

应用名

	SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("My App");
	JavaSparkContext sc = new JavaSparkContext(sparkConf);


3、RDD编程

弹性分布式数据集RDD(Resilient Distributed Dataset)是Spark对数据的核心抽象,在Spark中,对数据的操作不外乎 创建RDD,转化已有的RDD,调用RDD操作进行求值。

Spark中的RDD是一个不可变的分布式对象集合,每个RDD都被分为多个区,这些分区运行在集群中的不同节点上。

创建RDD,可以读取外部数据集,也可以使用驱动器程序的对象集合。

RDD支持两种类型的操作:转化操作transformation和行动操作action。转化操作会由一个RDD生成一个新的RDD。

转化操作和行动操作的区别就在于Spark的计算RDD的方式不同,Spark对RDD是采用惰性计算的方式。只有第一次执行行动操作的时候,才会真正计算。

默认情况下,RDD会在每次对它们进行行动操作的时候重新计算,如果想多个行动操作中重用一个RDD,可使用RDD.persist()让Spark把这个RDD缓存下来。

在实际操作中,经常使用persist()把数据的一部分读取到内存中,并反复查询这部分数据,这样可以避免低效。

		SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("My App");
		JavaSparkContext sc = new JavaSparkContext(sparkConf);
		
		//创建RDD,读取外部文件
		String path = "D://line_event.txt";
		JavaRDD<String> input = sc.textFile(path);
		//持久化RDD
		input.persist(new StorageLevel());
		
		System.out.println(input.count());//6357
		
		//转化操作,filter过滤
		JavaRDD<String> input_filter = input.filter(new Function<String, Boolean>()
			@Override
			public Boolean call(String arg0) throws Exception 
				return arg0.contains("\\"4\\"");
			
		);
		System.out.println(input_filter.count());//237
		
		//行动操作
		String first = input_filter.first();
		System.out.println(first);

1. 创建RDD

两种方式:读取外部数据集、在驱动程序中对一个集合进行并行化。

创建RDD最简单的方式就是把程序中一个已有的集合传给SparkContext的parallelize()方法,但需要注意,除开发原型和测试时,这种方式用的并不多,因为需要把真个数据集放在一台机器内存中。

		JavaRDD<String> input_par = sc.parallelize(Arrays.asList("pandas","spark","yuchen"));
		System.out.println(input_par.count());//3
更常用的方式是从外部存储中读取数据,前边已经讲了读取外部文本文件,后续会涉及更多。

2.RDD操作

RDD支持两种操作:转化操作和行动操作,转化操作返回的是RDD,行动操作是返回的其他数据类型。

转化操作

返回新的RDD,转化操作是惰性求值的,只有行动操作用到这些RDD时才会被计算。另外,转化操作后,旧的RDD还可以继续使用。

目前接触到的转化操作有:

map 返回一个新的分布式数据集,将数据源的每一个元素传递给函数 func 映射组成。

filter 返回一个新的数据集,从数据源中选中一些元素通过函数 func 返回 true。

union 取两个RDD的合集。

		JavaRDD<String> union = input_filter.union(input_par);
		System.out.println(union.count());//240

Spark会使用谱系图来记录这些不同的RDD之间的依赖关系。

虽然转化操作是惰性求值的,可以使用一次行动操作来强制执行转化操作。

行动操作

行动操作会把最终求得的结果返回到驱动程序中,或者写入到外部存储系统。目前接触的行动操作有: count 计算总数 first 取出第一个元素 take 获取指定数量的元素 collect 获取整个RDD中的数据,除非内存放的下才可以使用,因此collect不能用在大规模数据集上
		JavaRDD<String> union = input_par.union(input_filter);
		System.out.println(union.count());//240
		
		List<String> list = union.take(4);
		System.out.println(list);
输出结果 [pandas, spark, yuchen, (107060,([107060]...))] 另外,可以使用saveAsTextFile和saveAsSwquenceFile等方法,将RDD以各种自带的格式保存起来。

向Spark传递函数

Spark中的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来进行计算。Java中,函数需要是实现了function包中的任意函数接口的对象,根据不同的返回类型,定义了一些不同的接口:
Function<T,  R>    R  call(T)   接收一个输入值,并返回一个输出值,用于类似map()和filter()方法
Function2<T1, T2 , R>   R  call(T1, T2)  接收两个输入值并返回一个输出值,用于类似于aggregate()和fold()等操作
FlatMapFunction<T, R>  Iterable<R>  call(T)  接收一个输入值并返回任意个输出,用于类似flatMap()

可以通过匿名内部类,也可以使用具名类的方式 不过,顶级具名类在组织大型程序时显得比较清晰,另一个好处就是,可以给构造函数添加参数
		// 匿名类
		JavaRDD<String> input_filter = input.filter(new Function<String, Boolean>()
			@Override
			public Boolean call(String arg0) throws Exception 
				return arg0.contains("\\"4\\"");
			
		);
		System.out.println(input_filter.count());//237
		
		
		//具名类
		class Contains implements Function<String, Boolean>
			String filterStr;
			public Contains(String str)
				filterStr = str;
			
			@Override
			public Boolean call(String arg0) throws Exception 
				return arg0.contains(filterStr);
			
		
		
		JavaRDD<String> filters = input.filter(new Contains("\\"9\\""));
		System.out.println(filters.count());//637
在Java 8 中,也可以使用lambda表达式进行函数传递
RDD<String> str = lines.filter(s -> s.contains("error"));

常见的转化操作和行动操作

先讲受任意数据类型的RDD支持的转化操作和行动操作

转化操作

针对各个元素的转化操作
两个最常用的的转化操作是map()和filter(),函数应用于RDD中的每个元素,例如使用map,我们可以把URL集合中的每个url对应的主机名提取出来。
		//map
		JavaRDD<Integer> nums = sc.parallelize(Arrays.asList(1,3,4));
		JavaRDD<Integer> nums_2 = nums.map(new Function<Integer, Integer>()
			@Override
			public Integer call(Integer x) 
				return x*x;
			
		);
		System.out.println(nums_2.collect());//[1, 9, 16]
flatMap()对每个元素生成多个元素,注意看map()与flatMap()的区别:
		//注意flatMap与map的区别
		//flatMap 根据空格拆分
		JavaRDD<String> str = sc.parallelize(Arrays.asList("hello yuchen","hi","nice to meet you"));
		JavaRDD<String> str_split = str.flatMap(new FlatMapFunction<String, String>() 
			
			//注意使用FlatMapFunction时,call返回的是Iterator类型
			@Override
			public Iterator<String> call(String arg0) throws Exception 
				return Arrays.asList(arg0.split(" ")).iterator();
			

		);
		System.out.println(str_split.collect());//[hello, yuchen, hi, nice, to, meet, you]
		
		//map 根据空格拆分
		JavaRDD<List<String>> str_map = str.map(new Function<String, List<String>>()
			@Override
			public List<String> call(String arg0) throws Exception 
				
				return Arrays.asList(arg0.split("\\\\s+"));
			
		);
		System.out.println(str_map.collect());//[[hello, yuchen], [hi], [nice, to, meet, you]]
flatMap相当于将返回的迭代器”拍扁“
伪集合操作
RDD支持很多集合操作,比如合并,相交等,下边讲四种操作。 distinct()方法生成只包含不同元素的新RDD,不过distinct()开销很大,因为它需要将所有数据通过网络进行混洗shuffle。 union() 返回一个包含两个RDD中所有元素的RDD,如果输入的RDD有重复的数据,union也会包含这些重复的数据 intersection() 只返回两个RDD都有的元素,该方法会去掉所有重复元素,因为要混洗数据,所以效率比较低 cartesian() 返回所有可能的(a,b)对
		JavaRDD<Integer> nums = sc.parallelize(Arrays.asList(1,3,3,4));
		//distinct
		JavaRDD<Integer> nums_dis = nums.distinct();
		System.out.println(nums_dis.collect());//[4, 1, 3]
		
		//sample
		JavaRDD<Integer> nums_sam = nums.sample(false, 0.5);
		System.out.println(nums_sam.collect());//[3, 4]不确定,随机取值
		
		JavaRDD<Integer> nums2 = sc.parallelize(Arrays.asList(1,4, 6, 8));
		//union
		JavaRDD<Integer> nums_union = nums.union(nums2);
		System.out.println(nums_union.collect());//[1, 3, 3, 4, 1, 4, 6, 8]
		
		//intersection
		JavaRDD<Integer> num_inter = nums.intersection(nums2);
		System.out.println(num_inter.collect());//[4, 1]
		
		//subtract
		JavaRDD<Integer> num_sub = nums.subtract(nums2);
		System.out.println(num_sub.collect());//[3, 3]
		
		//cartesian笛卡尔积
		JavaPairRDD<Integer, Integer> num_carte = nums.cartesian(nums2);
		System.out.println(num_carte.collect());
		//[(1,1), (1,4), (1,6), (1,8), (3,1), (3,4), (3,6), (3,8), (3,1), (3,4), (3,6), (3,8), (4,1), (4,4), (4,6), (4,8)]

行动操作

行动操作一般都是针对于单个RDD的,先学习三种行动操作:reduce, fold , aggregate reduce:针对RDD的所有元素进行迭代操作,例如累加求和、累乘求积 fold:功能与reduce类似,也是对RDD的元素进行迭代操作,但必须提供一个初始值,用于计算中。 通过代码看reduce与fold的区别,注:不明白为什么fold操作的初始值在迭代操作的开始和结束都要计算一次
		JavaRDD<Integer> nums_1 = jsc.parallelize(Arrays.asList(1,3,5,6));
		//reduce
		Integer sum =nums_1.reduce(new Function2<Integer, Integer, Integer>() 
			@Override
			public Integer call(Integer arg0, Integer arg1) throws Exception 
				return arg0+arg1;
			
		);
		/**
		 * 1: 1+3;
		 * 2: 4+5;
		 * 3: 9+6;
		 * 结果15
		 */
		System.out.println(sum);
		JavaRDD<Integer> nums_2 = jsc.parallelize(Arrays.asList(1,5,7,9));
		//fold 与reduce操作类似,可以提供一个计算中使用到的初始值,初始值可以不使用,等同于reduce
		Integer sum2 = nums_2.fold(10, new Function2<Integer, Integer, Integer>() 
			@Override
			public Integer call(Integer arg0, Integer arg1) throws Exception 
				return arg0+arg1;
			
		);
		/**
		 * fold的计算过程:
		 * 1: 10+1;
		 * 2: 11+5;
		 * 3: 16+7;
		 * 4: 23+9;
		 * 5: 10+32
		 * 结果42
		 */
		System.out.println(sum2);
fold和reduce操作的返回值类型必须和RDD中元素的类型相同,如果需要得到不同类型返回结果的操作,就需要使用aggregate 与fold类似,aggregate也需要提供返回类型的初始值,“考虑到每个节点是在本地进行累加的,最终还需要带二个函数将累加器两两合并”,对于这句话不是很理解,难道是指的在集群环境下? fold可以看做是aggregate的简化:
		JavaRDD<Integer> nums_2 = jsc.parallelize(Arrays.asList(1,5,7,9));

		Function2<Double, Integer, Double> add = new Function2<Double, Integer, Double>() 
			@Override
			public Double call(Double arg0, Integer arg1) throws Exception 
				return arg0+arg1;
			
		;
		Function2<Double, Double, Double> combin = new Function2<Double, Double, Double>() 
			@Override
			public Double call(Double arg0, Double arg1) throws Exception 
				return arg0+arg1;
			
		;
		double res = nums_2.aggregate(10.0, add, combin);
		System.out.println(res);
示例,通过aggregate计算平均值:
		JavaRDD<Integer> nums_2 = jsc.parallelize(Arrays.asList(1,5,7,9));
		
		class AvgCount implements Serializable
			public int total;
			public int num;
			public AvgCount(int total,int num)
				this.total = total;
				this.num = num;
			
			public double avg()
				return total/(double)num;
			
		
		
		AvgCount initial = new AvgCount(0, 0);
		Function2<AvgCount,Integer,AvgCount> addAndCount = 
				new Function2<AvgCount, Integer, AvgCount>() 
					@Override
					public AvgCount call(AvgCount arg0, Integer arg1)
							throws Exception 
						arg0.total+=arg1;
						arg0.num++;
						return arg0;
					
				;
		Function2<AvgCount,AvgCount,AvgCount> combine=
				new Function2<AvgCount,AvgCount,AvgCount>()
					@Override
					public AvgCount call(AvgCount arg0, AvgCount arg1)
							throws Exception 
						arg0.total+=arg1.total;
						arg0.num+=arg1.num;
						return arg0;
					
		;
		AvgCount result = nums_2.aggregate(initial, addAndCount, combine);
		/**
		 * 1. initial 在addAndCount中与 RDD的元素进行计算,返回初始值类型的结果res1
		 *    所以Function2<AvgCount(初始值),Integer(RDD元素),AvgCount(返回值)> addAndCount;
		 * 2. initial 与在addAndCount中返回的结果res1进行计算,返回初始值类型的结果res2
		 *    所以Function2<AvgCount(初始值),AvgCount(res1),AvgCount(返回值)> combine;
		 *    可以看出aggregate的返回结果可以与RDD元素的类型不同
		 */
		System.out.println(result.avg());
还有其他行动操作,会以普通集合或者值的形式将RDD的部分或全部数据返回到驱动程序中:collect,take,top take(n) 尝试以只访问尽量少的分区的原则,获取数据到驱动程序中,排序顺序与期望会存在差异 top()可以使用默认或者自己定义比较函数的方式返回排序的数据
		JavaRDD<Integer> nums_1 = jsc.parallelize(Arrays.asList(2,1,3,5,6,4,10,1,-2));
		//take
		List<Integer> list_take = nums_1.take(3);
		System.out.println(list_take);//[2, 1, 3]集群下,原则是尝试只访问尽量少的分区
		//top 默认
		List<Integer> list_top_def = nums_1.top(3);
		System.out.println(list_top_def);//[10, 6, 5]默认是按照降序
		//top 指定比较函数
		class SortASC implements Comparator<Integer> , Serializable//java.util.Comparator
			@Override
			public int compare(Integer o1, Integer o2) 
				return o1<o2?1:-1;
			
		
		List<Integer> list_top_asc = nums_1.top(3, new SortASC());
		System.out.println(list_top_asc);//[-2, 1, 1]
有时候会使用一些行动操作,对RDD中的每个元素进行操作,单=但不把任何结果返回到驱动程序中,例如以JSON形式向网络服务器上发送数据,或者把数据存储到数据库中。可以使用foreach()行动操作来对RDD中的每个元素进行操作,而不需要把RDD发回本地。(集群的优势,不需要把所有数据集合到一台机子上,再进行存储操作)
		//foreach
		nums_1.foreach(new VoidFunction<Integer>() 
			@Override
			public void call(Integer arg0) throws Exception 
				System.out.println(arg0);
			
		);
也可以使用count返回RDD元素的个数,countByValue()返回每个元素对应的个数计数,返回为Map
		System.out.println(nums_1.count());
		System.out.println(nums_1.countByValue());//返回Map 值:计数
		//5=1, 10=1, 1=2, 6=1, 2=1, 3=1, 4=1, -2=1
		jsc.stop();













以上是关于Spark学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

为啥每个线程多次初始化惰性变量

理解 pyspark 中的惰性求值行为

学习笔记TF023:下载缓存属性字典惰性属性覆盖数据流图资源

Spark基础学习笔记14:Scala数据结构

Spark学习笔记:索引分区映射

无法在属性初始化程序中使用实例成员,将变量设置为惰性不起作用