使用Spark

Posted 董世森BigData

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark相关的知识,希望对你有一定的参考价值。

使用Spark


之前介绍了流式构建,它是一种围绕实时数据更新需求的解决方案,确切地说是一种准实时的方案,因为Kylin的核心部分是预计算,所以只需关心预计算和查询的优化等核心问题即可。说到预计算的优化,就离不开计算引擎的优化,我们知道目前无论是全量构建、增量构建还是流式构建,默认的计算引擎都是MapReduce,但是针对MapReduce进行的优化是有限的,我们需要寻找更高效的计算引擎Apache Spark作为新一代的分布式计算框架近年来快速发展壮大,性能日趋稳定,已经基本上可以完全取代MapReduce,因此Kylin从v2.0版本开始引入Spark作为Cube的计算引擎。


为什么要引入Apache Spark


在Kylin v2.0之前的版本,Apache Kylin使用MapReduce作为在庞大的数据集上构建Cube的计算框架,因为MapReduce框架相对简单、稳定,可以满足Kylin的需求。不过由于MapReduce框架的设计具有局限性,一轮MapReduce任务只能处理一个Map任务和一个Reduce任务,中间结果需要保存到磁盘以供下一轮任务使用,如果有多轮任务的话,必然需要消耗大量的网络传输和磁盘I/O,而这些都是非常耗时的操作,导致MapReduce构建Cube的性能不佳。因此为了获得更好的性能,Kylin在v1.5版本中引入了“Fast Cubing”算法,尝试在Map端的内存中进行尽可能多的聚合,以减少磁盘和网络I/O,但并非所有数据模型都能从中受益


在Kylin v2.0版本中,我们引入Apache Spark作为Cube的计算引擎。ApacheSpark是一个开源的分布式计算框架,它提供了一个集群的分布式内存抽象(RDD),以及基于RDD的一系列灵活的应用程序编程接口。Spark是基于内存的迭代计算框架,不依赖Hadoop MapReduce的两阶段范式,由RDD组成有向无环图(DAG),RDD的转换操作会生成新的RDD,新的RDD的数据依赖父RDD保存在内存中的数据,这使得对于在重复访问相同数据的场景下,重复访问的次数越多、访问的数据量越大,引入Apache Spark作为Cube计算引擎的收益也就越大。由于这种在内存中迭代计算的设计非常符合Cube分层构建算法,加上受益于Kylin的可插拔架构,我们扩展Spark作为Kylin构建Cube的计算引擎。


 Spark构建原理


在介绍Spark构建之前,先来看一下Kylin是怎么使用MapReduce进行构建的。

如图所示,


我们使用分层构建算法建一个包含4个维度的Cube,

第一轮MR任务从源数据聚合得到4维的Cuboid,也就是Base Cuboid,

第二轮MR任务由4维的Cuboid聚合得到3维的Cuboid,

以此类推,在经过N+1轮MR任务后,所有的Cuboid都被计算出来。

使用Spark

使用了“分层”算法进行构建,分层构建算法的思想就是把一个大的构建任务分成多个步骤来完成,并且每个步骤都是在上一步骤输出的基础上进行的,这样不但可以重用上一步骤的计算结果,而且如果当前步骤计算出错的话,整个计算任务不用从头开始,只需要基于上一步骤开始就可以了因此可以看出分层构建的算法是可“依赖”的算法,由于Spark中的RDD也是可依赖的,新的RDD的数据依赖于父RDD保存在内存中的数据,因此我们在Spark中构建Cube时依然使用分层构建算法。

如图所示

使用Spark


,第N层N维的Cuboid可以看作一个RDD,那么一个有N个维度的Cube就会生成N+1个RDD,这些RDD是父子关系,N维的RDD由N-1维的RDD生成,并且父RDD是缓存在内存中的RDD.persist(StorageLevel),这使得Spark构建会比MR构建更加高效,因为进行MR构建的时候,父层数据是存储在磁盘上的,为了最大化地利用Spark的内存,父RDD在生成子RDD后需要从内存中释放RDD.unpersist(),而且每一层的RDD都会通过Spark提供的API持久化保存到HDFS上。


这样经过N+1层的迭代,就完成了所有Cuboid的计算。

使用Spark


图Spark构建4维Cube

如图所示是在用Spark构建Cube时的DAG(有向无环图)

使用Spark

图中详细地说明了Spark构建Cube的过程:


在“Stage 5”中,Kylin使用HiveContext读取中间临时表,然后执行map操作,这是一对一的映射,将原始值编码为KV类型,key 是维度值经编码后组成的 rowkey,value是度量编码后的二进制数据,这步操作完成后,Kylin将获得一个编码后的中间RDD。


在“Stage 6”中,对中间RDD进行reduceByKey操作,聚合后得到RDD-1,也就是Base Cuboid。接下来,在RDD-1的基础上执行“flatMap”算子,这是一对多的操作,因为 Base Cuboid 中有 N 个子 Cuboid 。然后循环执行这两个操作,直到把所有层的RDD全部计算完成。


使用Spark构建Cube


将介绍如何配置Spark引擎,并在HDP 2.4 Sandbox VM环境中使用samplecube演示如何使用新的构建引擎;然后会介绍如何开启Spark动态资源分配,使Spark根据Kylin的负载情况动态地增加或减少executor的数量,以最大限度地节省资源;最后介绍在使用Spark构建引擎的过程中的错误处理方法和常见问题的排查方法。


 配置Spark引擎


要在YARN上运行Spark,需要指定HADOOP_CONF_DIR环境变量,该变量是包含Hadoop(客户端)配置文件的目录。在许多Hadoop发行版中,该目录是“/etc/hadoop/conf”,Kylin可以自动从Hadoop配置中检测到此文件夹,因此默认情况下不需要设置此属性,如果配置文件不在默认文件夹中,请明确设置此属性。


Kylin内嵌了一个解压后的Spark二进制包在“$KYLIN_HOME/spark”目录下,并且默认把这个目录作为SPARK_HOME,用户也可以使用自己环境中的SPARK_HOME环境变量,但是有可能出现版本不兼容的问题,所以建议使用Kylin目录下的Spark作为SPARK_HOME所有的Spark相关配置都可以在“$KYLIN_HOME/conf/kylin.properties”文件中使用“kylin.engine.spark-conf.”前缀来进行管理,Kylin在提交Spark任务的时候,会提取这些配置属性并将其应用于提交的任务。

例如,在kylin.properties中配置了“kylin.engine.spark-conf.spark.executor.memory=4G”,那么Kylin在提交Spark任务的时候就会把“-conf.spark.executor.memory=4G”作为参数传送给“spark-submit”脚本。在运行Spark构建引擎之前,建议先检查Spark的相关配置,然后根据集群资源情况进行调整,以下是推荐的配置:

使用Spark

如果您的Kylin是运行在Hortonworks平台上,那么需要为YARN容器指定hdp.version作为JVM选项参数,因此需要在kylin.properties中打开后三行的注释,并将其中的HDP版本号替换为您自己的相应版本。此外,为了避免重复将Spark JAR上传到YARN,可以手动配置JAR在HDFS中的位置,注意HDFS的位置需要用完全限定的名称。示例如下:

使用Spark

这样,kylin.properties中的配置将变成以下格式:

使用Spark

并且所有的kylin.engine.spark-conf.参数均可在Cube级别或项目级别被替换,用户可以灵活、方便地使用。接下来,以sample cube为例,

演示如何使用Spark构建引擎

首先运行sample.sh创建sample cube,然后启动Kylin。

示例如下

使用Spark

Kylin成功启动之后,进入Kylin的web界面,编辑“kylin_sales”这个cube,在“Advanced Setting”页面中把“Cube Engine”由“MapReduce”更改为“Spark”,

如图所示,

使用Spark

这样就完成了把构建引擎指定为Spark的操作。

使用Spark

设置Cube Engine点击“Next”按钮进入“Configuration Overwrites”页面,在这个页面中我们可以添加一些参数覆盖,通过点击“+Property”按钮来进行参数覆盖配置

使用Spark

然后点击“Next”→“Save”按钮完成Spark构建引擎配置

返回Cube页面,点击“Build”按钮,然后选择想要构建的时间段,这样就启动了一次新的构建任务,

在“Monitor”页面点击这个任务查看它的完成进度和状态,可以看到“Build Cube with Spark”的步骤

(如图所示),

使用Spark

该步骤使用Spark进行Cube的构建。不仅是这个步骤使用了Spark,在Kylin v2.5版本中实现了“Allin Spark”,也就是把构建过程中所有MapReduce的步骤都用Spark来完成,包括“Extract Fact Table Distinct Columns”“Convert Cuboid Data to HFile”等,这样可以极大地提高构建的速度。

使用Spark

查看构建任务的完成进度和状态当所有步骤都完成以后,这个Cube就变成了“Ready”状态,我们就可以对这个Cube进行正常的查询了。



开启Spark动态资源分配


在Spark中,所谓资源单位一般指的是executor,和YARN中的容器一样,在SparkOn YARN模式下,通常使用“num-executors”来指定应用程序使用的executor的数量,而executor-memory和executor-cores分别用于指定每个executor所使用的内存和虚拟CPU核数。以Spark Cubing这一步骤为例,如果使用的是固定的资源分配策略,在提交任务的时候指定“num-executors”为“3”,那么YARN会为这个Spark任务分配4个容器(一个固定用于application master,3个用于executor),这就出现了一个问题:不管构建的数据量多大,都会使用相同的资源,这样在数据量较小的时候就会出现资源浪费的情况,除非在每次构建前就知道这次构建的数据量大小,然后在Cube级别重写Spark任务相关的配置,这会很麻烦并且很多情况下用户并不知道数据量的大小。但是,如果把资源分配策略设置成动态分配的话,那么,Spark可以根据任务的负载情况动态地增加和减少executor的数量以减少资源的浪费。配置 Spark 动态资源分配需要在Spark的配置文件中添加一些配置项以开启该服务。由于在Kylin中可以通过在kylin.properties中进行配置来直接覆盖Spark中的配置,因此只需要在kylin.properties中进行以下配置:

使用Spark

以上配置只是一个示例,具体配置的值可以根据实际情况进行调整。更多的配置项可以参考Spark相关文档。



出错处理和问题排查


如果在使用Spark构建引擎的时候遇到构建失败的情况,可以在“$KYLIN_HOEM/logs/kylin.long”文件里找到Kylin提交Spark任务时的完整命令

。例如:

使用Spark

可以手动复制这个命令到命令行里,并且可以调整一下参数,然后运行这个命令,在执行过程中,你可以进入YARN resource manager检查运行任务的状态,如果这个任务执行完成,也可以进入Spark history server中检查这个任务的日志和信息。默认情况下,Kylin会把Spark history信息输出到“hdfs:///kylin/spark-history”目录下,你需要在该目录中启动“Spark history server”,或者使用已经启动好了的Spark history server,然后在“$KYLIN_HOME/conf/kylin.properties”中把参数“kylin.engine.spark-conf.spark.eventLog.dir”和“kylin.engine.spark-conf.spark.history.fs.logDirectory”的值指定为这个history server监听的目录。你可以运行以下命令来启动一个Spark history server,并且指定输出目录。在运行之前请确保停止了已经存在的Spark history server任务。

使用Spark

启动Spark history server之后在浏览器中访问“http://<hostname>:18080”即可查看Spark的历史任务信息,如图所示。

使用Spark

点击具体的任务名称即可查看任务运行时的详细信息,这些信息在进行问题排查和性能调优时是非常有帮助的。比如,在某些Hadoop的发行版本上,你可能会在“Convert Cuboid Data toHFile”这个步骤遇到类似以下错误:

使用Spark

该错误是由于Spark任务在运行时使用的HBase相关的jar包不正确导致的,

解决这个问题的方法是复制hbase-hadoop2-compat-*.jar和hbase-hadoop-compat-*.jar到“$KYLIN_HOME/spark/jars”目录下,这两个jar包可以在HBase的lib目录下找到,然后重新提交运行失败的任务,这个任务最终应该会执行成功。此问题相关的issue是KYLIN-3607,已经被修复。可见,Spark任务运行时的信息非常重要,如果在使用Spark构建引擎过程中出现错误,这些信息可以帮助你找到出现问题的根本原因。



使用Spark SQL创建中间平表


Kylin默认的数据源是Hive,构建Cube的第一步是创建Hive中间平表,也就是把事实表和维度表连接成一张平表,这个步骤是通过调用数据源的接口来完成的。以前使用Hive来执行构建平表的SQL,现在可以用Spark-SQL来执行。

接下来,介绍如何在Kylin里使用Spark SQL来创建平表。要启用这个功能需要以下几个步骤。

1)首先要确保以下配置已经存在于hive-site.xml中

2)然后把Hive的执行引擎(hive.execution.engine)改成MapReduce


3)hive-site.xml复制到“$SPARK_HOME/conf”目录下,并且确保环境变量“HADOOP_CONF_DIR”已经设置完毕。


4)使用“sbin/start-thriftserver.sh--master spark://<sparkmaster-host>:<port>”命令来启动thriftserver,通常端口是7077


5)通过修改“$KYLIN_HOME/conf/kylin.properties”设置参数


配置完成之后,重启Kylin使配置生效,这样Kylin在创建中间平表的时候就会使用Spark SQL来完成。

如果想要关闭这个功能,只需要把“kylin.source.hive.enable-sparksql-for-table-ops”重新设为“false”就可以了。


小结

Kylin v2.0版本中引入了Spark作为Cube的构建引擎,并且在Kylin v2.5版本中实现了完全使用Spark,显著提高了Kylin的构建速度,也为Kylin完整地运行于非Hadoop环境(如Spark on Kubernetes)提供了可能。

Spark的构建原理和如何配置Spark构建引擎,以及如何进行出错处理和问题排查等。推荐开启Spark的动态资源分配,让Spark根据负载自动增加或减少构建资源,因为Spark的性能主要依赖集群的Memory和CPU资源,假设要构建一个数据量很大又有复杂数据模型的Cube,如果没有给这个构建任务足够的资源,那么Sparkexecutors很可能会出现“OutOfMemorry”异常。对于那些有UHC维度,并且有很多维度组合,同时又包含非常占用内存的度量(如Count Distinct、Top_N)的Cube,建议使用MapReduce引擎进行构建,这会相对比较稳定。对于简单的Cube模型,如所有的度量都是“SUM/MIN/MAX/COUNT”且拥有中等规模的数据量,那么使用Spark构建引擎是一个很好的选择。

以上是关于使用Spark的主要内容,如果未能解决你的问题,请参考以下文章

手把手带你玩转Spark机器学习-使用Spark进行文本处理

科普Spark,Spark是什么,如何使用Spark

Spark 支持使用 Windows 功能

使用 Java 的 Spark 和 Spark SQL 新手

手把手带你玩转Spark机器学习-使用Spark构建分类模型

Spark机器学习实战-使用Spark进行数据处理和数据转换