Apache Spark基础知识

Posted 终回首

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Spark基础知识相关的知识,希望对你有一定的参考价值。

我的spark学习笔记,基于Spark 2.4.0

目录

一、简介

Apache Spark是一个分布式计算框架,相比上一代计算框架MapReduce速度更快,且提供更多更方便的接口和函数实现。

Spark软件栈示意图

Spark包括Spark Core、Spark Sql、Spark Structured Streaming、Spark Streaming、Spark MLib、Spark GraghX。

Spark Core里包含任务调度、内存管理、错误恢复、与存储系统交互等模块。还包括RDD(Resilient Distributed Dateset 弹性分布式数据集)的定义。

Spark Sql是Spark提供的处理结构化数据的包,通过Spark Sql可以使用SQL或 hive sql来查询数据。

Spark Structured Streaming是Spark提供的以SQL的方式处理流式数据的包。

Spark Streaming是Spark 提供的处理流式数据的包。

Spark MLib是Spark封装好的机器学习包,封装了一些算法。

Spark GraghX是Spark提供的处理图数据的包。

PS:Spark编程 最好使用Scala语言,Scala Api比Java Api调用方便,Scala性能比Python更好

二、RDD编程

1 RDD介绍

Spark1.0开始就有的,弹性分布式数据集。数据的抽象模型,跨集群节点分区的元素集合,可以并行操作,且有容错恢复功能。不应该把RDD看做存放特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的,记录如何计算数据的指令列表。

2 RDD操作

RDD操作分为两类,一类是transformation(转换算子),转换算子会返回一个新的RDD,转换算子都是懒执行;另一类是action(行动算子),行动算子会返回汇总RDD或者输出数据,行动算子是立即执行的。
转化操作可能会多次执行,所以为了避免读取超时和重算消耗,读取时间较长或多次使用的RDD必须持久化。

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
val totoalCount = lineLengths.count()

这个例子里,因为有reduce()、count()两个行动算子,所以textFile()、map()会执行2次。
如果在第2行后插入一行如下代码

lineLengths.persist()

这样的话每个算子都会只运行一次

2.0 读操作

RDD读操作的方法都在SparkContext里,下面是几个常用的

读操作作用
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]从HDFS或本地文件系统或其他支持Hadoop文件接口的其他数据源读取文件,返回一个String类型的RDD
binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]读取二进制文件
objectFile[T](path: String, minPartitions: Int = defaultMinPartitions)(implicit arg0: ClassTag[T]): RDD[T]读取序列化后的Sequence文件
sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)]根据给定的key-value格式读取,返回一个String类型的RDD
wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]读取文件,一个文件会生成一个(文件名,文件内容)的k v ,适用于处理多个小文件或处理整个文件

2.1 常用Tramsformation算子

下表列出了 Spark 支持的一些常见转换。有关详细信息,请参阅 RDD API 文档(ScalaJavaPython)和配对 RDD 函数文档(ScalaJava)。

转换算子作用
map(func)返回使用传入函数转换每个元素后的RDD
filter(func)过滤操作,保留传入函数判断为true的数据
flatMap(func)类似map,不同的是,flatMap输入一个可迭代的列表,输出0到多个,这就要求func返回值必须是可迭代的数据结构
mapPartitions(func)类似map,不同的是mapPartitions是针对一个分区做一次操作,输入和输出都是可迭代的数据结构
mapPartitionsWithIndex ( func )通过对这个 RDD 的每个分区应用一个函数来返回一个新的 RDD,同时可以得到原始分区的索引。
sample(withReplacement, fraction, seed)使用给定的随机数生成器种子对数据的一小部分进行采样,无论是否有替换。
union(otherRDD)返回两个RDD合并后的RDD
intersection(otherRDD)返回两个RDD交集的RDD
distinct([numPartitions]))返回去除重复元素的RDD
groupByKey([numPartitions])在 (K, V) 对的数据集上调用时,返回 (K, Iterable) 对的数据集。如果分组是为了对每个键执行聚合(例如求和或平均值),使用reduceByKey或aggregateByKey将产生更好的性能。默认情况下,输出中的并行级别取决于父 RDD 的分区数。可以传递一个可选numPartitions参数来设置不同数量的任务。
reduceByKey(func, [numPartitions])当在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数func聚合,该函数必须是 (V,V) => V. 与groupByKey一样,reduce 任务的数量可通过可选的第二个参数进行配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])PairRDD才能调用,使用给定的聚合函数和中性的“零值”聚合每个键的值。这个函数可以返回与这个RDD V中的值类型不同的结果类型U。同样可以传入一个分区数来设置分区的数量
sortByKey([ascending], [numPartitions])PairRDD才能调用,根据key排序,特殊点在于会产生shuffle
join(otherDataset, [numPartitions])当调用类型为 (K, V) 和 (K, W) 的数据集时,返回一个 (K, (V, W)) 对的数据集,其中包含每个键的所有元素对。外连接通过leftOuterJoin,rightOuterJoin和fullOuterJoin实现。
cogroup(otherDataset, [numPartitions])当调用类型为 (K, V) 和 (K, W) 的数据集时,返回一个包含 (K, (Iterable, Iterable)) 元组的数据集。此操作和groupWith相同。
pipe(command, [envVars])通过 shell 命令(例如 Perl 或 bash 脚本)管理 RDD 的每个分区。RDD 元素被写入进程的标准输入,输出到标准输出的行作为字符串的 RDD 返回。
coalesce(numPartitions)将 RDD 中的分区数减少到 numPartitions。对过滤大型数据集后更有效地运行操作很有用。
repartition(numPartitions)随机重组 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡,会产生shuffle。repartion是coalesce的shuffle为true的调用
repartitionAndSortWithinPartitions(partitioner)根据给定的分区器对 RDD 进行重新分区,并在每个结果分区内,按键对记录进行排序。这比repartition在每个分区内调用然后排序更有效,因为它可以将排序下推到 shuffle 机器中。

2.2 常用Action算子

下表列出了 Spark 支持的一些常见操作。请参阅 RDD API 文档(ScalaJavaPython、 R)和Pair RDD 函数文档(ScalaJava )了解详细信息。

action算子作用
reduce(func)使用函数func(它接受两个参数并返回一个)聚合数据集的元素。该函数应该是可交换的和关联的,以便它可以被正确地并行计算。
collect()在驱动程序中将数据集的所有元素作为数组返回。这通常在过滤器或其他返回足够小的数据子集的操作之后很有用。
count()返回数据集中元素的数量。
first()返回数据集的第一个元素(类似于 take(1))。
take(n)返回一个包含数据集前n 个元素的数组。
takeSample(withReplacement, num, [seed])返回一个数组,其中包含数据集的num 个元素的随机样本,有或没有替换,可选择预先指定随机数生成器种子。
takeOrdered(n, [ordering])使用自然顺序或自定义比较器返回RDD的前n 个元素。
saveAsTextFile(path)将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 以将其转换为文件中的一行文本。
saveAsSequenceFile(path)将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用。在 Scala 中,它也可用于隐式转换为 Writable 的类型(Spark 包括对 Int、Double、String 等基本类型的转换)。
saveAsObjectFile(path)使用 Java 序列化以简单格式编写数据集的元素,然后可以使用 SparkContext.objectFile()读取
countByKey ()仅适用于 (K, V) 类型的 RDD。返回 (K, Int) 对的HashMap,其中包含每个键的计数。
foreach(func)对数据集的每个元素运行函数func。

2.3 传递方法、对象、变量

scala传递方法k可以传递方法的引用或静态方法传递给Spark,引用类型必须实现序列化,尽量避免使用引用类型。变量尽量使用临时变量。

2.4 Shuffle操作

Spark 中的某些操作会触发一个称为 shuffle 的事件。shuffle 是 Spark 用于重新分配数据的机制,以便它跨分区进行不同的分组。这通常涉及在执行器和机器之间复制数据,从而使 shuffle 成为一项复杂且成本高昂的操作。

要了解在 shuffle 期间发生了什么,可以考虑reduceByKey操作示例 。该reduceByKey操作生成一个新的 RDD,其中单个键的所有值都组合成一个元组 - 键和针对与该键关联的所有值执行 reduce 函数的结果。挑战在于,单个键的所有值不一定位于同一分区,甚至同一台机器上,但它们必须位于同一位置以计算结果。

在 Spark 中,数据通常不会跨分区分布在特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,为了组织单个reduceByKeyreduce 任务执行的所有数据,Spark 需要执行一个 all-to-all 操作。它必须从所有分区中读取以找到所有键的所有值,然后将跨分区的值组合在一起以计算每个键的最终结果 - 这称为shuffle。

编码中要尽量避免shuffle

2.5 RDD持久化

Spark 中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当你持久化一个 RDD 时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从它派生的数据集)的其他操作中重用它们。这使得未来的动作可以更快(通常超过 10 倍)。缓存是迭代算法和快速交互使用的关键工具。

可以使用persist()、cache()、checkpoint()。其中cache()调用了persist(StorageLevel.MEMORY_ONLY)。

每个持久化的 RDD 都可以使用不同的存储级别来存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的 Java 对象(以节省空间),跨节点复制它。这些级别是通过将 StorageLevel对象(Scala、 Java、 Python)传递给persist()


持久化级别的选择

  • 如果硬件资源充足建议使用内存
  • 如果硬件资源不足,建议使用内存和磁盘
  • 硬件资源最不理想可以使用磁盘

Spark 还会在 shuffle 操作(例如reduceByKey)中自动保留一些中间数据,即使没有调用persist.。这样做是为了避免在 shuffle 期间节点失败时重新计算整个输入。

2.6 共享变量

通常,当传递给 Spark 操作(例如map或reduce)的函数在远程集群节点上执行时,它会处理函数中使用的所有变量的单独副本。这些变量被复制到每台机器上,并且对远程机器上的变量的更新不会传播回驱动程序。支持跨任务的通用读写共享变量将是低效的。但是,Spark 确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。

2.6.1 广播变量

一般情况下,传递一个变量给map()等方法时,会在每个task基本保存一个变量,如果变量很大时就会导致占用很大空间。

广播变量允许我们在每台机器上缓存一个只读变量,而不是随任务一起传送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark 还尝试使用高效的广播算法来分发广播变量以降低通信成本。

广播变量是通过调用SparkContext.broadcast(v)从变量创建的。广播变量是围绕变量的包装器,可以通过调用value 方法访问其值。下面的代码显示了这一点:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

需要注意的是广播变量不应该在广播后修改。

2.6.2 累加器

累加器是可以在分布式执行的一个计数器。支持一些默认的累加器,也支持自定义累加器

累加器的例子

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

scala> accum.value
res2: Long = 10

3 性能优化

3.1 RDD复用

相同算子相同计算逻辑的,一定要重用,不要重复计算。

3.2 尽可以提前filter

提前将不需要的数据过滤掉,可以减少后面计算的时间。如果先计算后过滤,则浪费了部分计算时间。

3.3 读取多个小文件

读取多个小文件或处理单个文件要用wholeTextFiles读取

3.4 map和mapPartition

普通的map算子每个元素做一次操作,如果是写数据库之类的成本比较高的操作成本就太高了。此时可以用mapPartition,mapPartition算子是一个分区的数据做一次操作。
数据量大可能会OOM,所以要适当调整内存。

其他的foreach和foreachPartition也是类似的。

3.5 filter+coalesce/repartition

filter后,有一些分区的数据可能会减少很多,一些分区减少不多,此时就会造成分区间数据分布不均匀。此时重分区可以适当提升性能。

使用filter后,可以用coalesce或者repartition调整分区数。

  1. 减少分区
    使用coalesce,shuffle设置为false
  2. 增多分区
    使用repartition,或者coalesce的shuffle设置为true

3.6 并行度设置

Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。这样可以更充分地利用硬件资源。
并行度可以通过SparkConf设置

val conf = new SparkConf().set("spark.default.parallelism", "300")

3.7 聚合算子尽量使用reduceByKey

聚合算子尽可能少用groupByKey,尽可能多的用reduceByKey。
reduceByKey会在map端预聚合,每个key预先计算出一个值,到reduce再进行最终聚合。所以会节约IO,网络传输等时间。

groupByKey则是将所有数据都拉取到reduce端才会执行聚合,相比reduceByKey而言浪费了很多时间。

3.8 使用持久化+checkpoint

以下情况应该将RDD缓存起来

  1. RDD在后续会多次使用
  2. RDD来自数据库,可能会遇到超时问题
  3. RDD计算成本很高,遇到节点故障等问题,重算成本高

checkpoint可将数据缓存到HDFS等文件系统,如果缓存数据丢失可以读取checkpoint数据。缺点是与文件系统交互,io速度慢。

遇到这三种情况必须将RDD缓存

sc.setCheckpointDir('HDFS')
rdd.cache/persist(memory_and_disk)
rdd.checkpoint

3.9 使用广播变量

默认情况下RDD中使用外部变量,会在每个task中生成一个副本,如果变量数据很大会占用很多内存。此时要使用广播变量,广播变量会在每个executor保存一个副本,这个executor的所有task都会引用这个副本,task很多的时候可以节约很多内存。

3.10 使用Kryo序列化

从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs已经默认是Kryo序列化了。

自定义的类需要实现

public class MyKryoRegistrator implements KryoRegistrator
  @Override
  public void registerClasses(Kryo kryo)
    kryo.register(StartupReportLogs.class);
  

//创建SparkConf对象
val conf = new SparkConf().setMaster().setAppName()
//使用Kryo序列化库
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化库中注册自定义的类集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 

3.11 join前先给两个PairRDD指定相同分区器

join前给两个RDD指定相同的分区器,可以避免昂贵的shuffle操作。
如果不指定就会产生shuffle。

3.12 数据倾斜

某个分区的数据显著多于其他分区的数据导致单个task执行时间远远超出平均执行时间。

表现为

  • 绝大多数 task 执行得都非常快,但个别 task 执行极慢,整体任务卡在某个阶段不能结束。
  • 原本能够正常执行的 Spark 作业,某天突然报出 OOM(内存溢出)异常,观察异常栈,是写的业务代码造成的。

3.12.1 数据源倾斜

例如某市不同街道上的过车记录,一定是十几条主要街道过车记录远远大于其他街道。
这种数据源本身就是倾斜的。

像这种数据可以针对性的采集时就将主要街道的记录针对性的打散,比如加1~8的前缀,聚合计算后,再次汇总聚合。
缺点:需要预处理数据,计算步骤增多

3.12.2 shuffle数据倾斜

场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。

  1. 调整shuffle算子并行度

例如原本有10个task,则可以尝试repartition(20),将多个key分不到其他分区,缓解shuffle。
也可以尝试减少并行度,coalesce(5,true)减少分区,使数据更加均匀,缓解shuffle。具体减少还是增加,需要测试对比。
劣势:适用场景少,只能将分配到同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。

  1. 换分区器

默认情况下Spark使用HashPartitioner,HashPartitioner是对key求取hash值再对partitions 取余数的方法,因此如果大部分key是相同的话将会导致,各partition之间存在数据倾斜的问题,极端情况下,RDD的所有row被分配到了同一个partition中。

RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

因此可以尝试使用RangePartitioner来缓解数据倾斜。

  1. 自定义分区器

将多个key分布到不同分区

完全随机分区

class CustomerPartition(partitions: Int) extends Partitioner 

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = 

    (key.toString.charAt(0) + scala.util.Random.nextInt(10)) % numPartitions
  

只针对部分key随机分区

package org.apache.spark.examples
import org.apache.spark.HashPartitioner, Partitioner

class CustomerPartition(partitions: Int) extends Partitioner 
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match 
    case null => 0
    case _ => nonNegativeMod(key, numPartitions)
  

  override def equals(other: Any): Boolean = other match 
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  

  override def hashCode: Int = numPartitions

  def nonNegativeMod(x: Any, mod: Int): Int = 
    val skewKeys = Set("key1","key2","key3","key4","key5")

    val keyHashCode = x.hashCode
    val rawMod = keyHashCode % mod
    if(skewKeys.contains(x.toString))
      val sourcePartitionNum = rawMod + (if (rawMod < 0) mod else 0)
      sourcePartitionNum + (scala.util.Random.nextInt(mod + skewKeys.size) % (mod + skewKeys.size))
    else
      rawMod + (if (rawMod < 0) mod else 0)
    
  

3.12.3 join数据倾斜

  1. 使用自定义分区器
    join前必须两个RDD设置相同的分区器(可以使用3.12.2 shuffle数据倾斜里的自定义分区器或其他自定义分区器)重分区

  2. 倾斜RDD加n个随机前缀,不倾斜RDD膨胀n倍
    将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。再做需要的聚合操作。

案例

package org.apache.spark.examples

import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.sql.SparkSession
import org.apache.spark.HashPartitioner, Partitioner, SparkConf
import scala.util.Random

class DataSkew 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf()
    sparkConf.setAppName("ResolveDataSkewWithNAndRandom")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = sparkSession.sparkContext
    val leftRdd = sc.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/").map(row => (row.split(",")(0),row.split(",")(1)))
    val rightRdd = sc.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/").map(row => (row.split(",")(0),row.split(",")(1)))
    var addList: List[String] = List()
    var a:Int = 0
    for(a <- 1 to 20)
      addList = addList :+ a.toString
    
    val addListKeys = sc.broadcast(addList)
    val newRandomPrefixLeftRDD = leftRdd.map(t2 => (Random.nextInt(20)+","+t2._1,t2._2)).partitionBy(new CustomerPartition(20))
    val rightPrefixRdd = rightRdd.flatMap(t2 => (addListKeys.value.toStream.map((s => (s + "," +t2._1,t2._2))).toList)).partitionBy(new CustomerPartition(20))
    val joinRDD = leftRdd.join(rightRdd).map(t2 => (t2._1.split(",")(1),t2._2._2))
    joinRDD.foreachPartition(iterable => 
      val atomicInteger = new AtomicInteger()
      iterable.toStream.foreach(t2 => atomicInteger.incrementAndGet())
    )
    sc.stop()
    sparkSession.stop()
  


三、Spark SQL、DataFrame、Dataset

DataFrame是Dataset的一个子集,从源码来看DataFrame就是Row类型的Dataset。Dataset类似关系数据库的一张表。Dataset提供了编译时类型检查。

1 入门

1.1 SparkSession

sparksql的功能入口是SparkSession,SparkSession创建案例

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option"以上是关于Apache Spark基础知识的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark Python 到 Scala 的翻译

将 Spark SQL 批处理源转换为结构化流接收器

2020寒假学习记录——Spark及其生态圈的了解

Spark SQL 会完全取代 Apache Impala 或 Apache Hive 吗?

spark

Apache spark - JDBC 写入,中间提交