使用Spark
Posted 董世森BigData
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark相关的知识,希望对你有一定的参考价值。
使用Spark
之前介绍了流式构建,它是一种围绕实时数据更新需求的解决方案,确切地说是一种准实时的方案,因为Kylin的核心部分是预计算,所以只需关心预计算和查询的优化等核心问题即可。说到预计算的优化,就离不开计算引擎的优化,我们知道目前无论是全量构建、增量构建还是流式构建,默认的计算引擎都是MapReduce,但是针对MapReduce进行的优化是有限的,我们需要寻找更高效的计算引擎。Apache Spark作为新一代的分布式计算框架近年来快速发展壮大,性能日趋稳定,已经基本上可以完全取代MapReduce,因此Kylin从v2.0版本开始引入Spark作为Cube的计算引擎。
为什么要引入Apache Spark
在Kylin v2.0之前的版本,Apache Kylin使用MapReduce作为在庞大的数据集上构建Cube的计算框架,因为MapReduce框架相对简单、稳定,可以满足Kylin的需求。不过由于MapReduce框架的设计具有局限性,一轮MapReduce任务只能处理一个Map任务和一个Reduce任务,中间结果需要保存到磁盘以供下一轮任务使用,如果有多轮任务的话,必然需要消耗大量的网络传输和磁盘I/O,而这些都是非常耗时的操作,导致MapReduce构建Cube的性能不佳。因此为了获得更好的性能,Kylin在v1.5版本中引入了“Fast Cubing”算法,尝试在Map端的内存中进行尽可能多的聚合,以减少磁盘和网络I/O,但并非所有数据模型都能从中受益。
在Kylin v2.0版本中,我们引入Apache Spark作为Cube的计算引擎。ApacheSpark是一个开源的分布式计算框架,它提供了一个集群的分布式内存抽象(RDD),以及基于RDD的一系列灵活的应用程序编程接口。Spark是基于内存的迭代计算框架,不依赖Hadoop MapReduce的两阶段范式,由RDD组成有向无环图(DAG),RDD的转换操作会生成新的RDD,新的RDD的数据依赖父RDD保存在内存中的数据,这使得对于在重复访问相同数据的场景下,重复访问的次数越多、访问的数据量越大,引入Apache Spark作为Cube计算引擎的收益也就越大。由于这种在内存中迭代计算的设计非常符合Cube分层构建算法,加上受益于Kylin的可插拔架构,我们扩展Spark作为Kylin构建Cube的计算引擎。
Spark构建原理
在介绍Spark构建之前,先来看一下Kylin是怎么使用MapReduce进行构建的。
如图所示,
我们使用分层构建算法构建一个包含4个维度的Cube,
第一轮MR任务从源数据聚合得到4维的Cuboid,也就是Base Cuboid,
第二轮MR任务由4维的Cuboid聚合得到3维的Cuboid,
以此类推,在经过N+1轮MR任务后,所有的Cuboid都被计算出来。
使用了“分层”算法进行构建,分层构建算法的思想就是把一个大的构建任务分成多个步骤来完成,并且每个步骤都是在上一步骤输出的基础上进行的,这样不但可以重用上一步骤的计算结果,而且如果当前步骤计算出错的话,整个计算任务不用从头开始,只需要基于上一步骤开始就可以了。因此可以看出分层构建的算法是可“依赖”的算法,由于Spark中的RDD也是可依赖的,新的RDD的数据依赖于父RDD保存在内存中的数据,因此我们在Spark中构建Cube时依然使用分层构建算法。
如图所示
,第N层N维的Cuboid可以看作一个RDD,那么一个有N个维度的Cube就会生成N+1个RDD,这些RDD是父子关系,N维的RDD由N-1维的RDD生成,并且父RDD是缓存在内存中的RDD.persist(StorageLevel),这使得Spark构建会比MR构建更加高效,因为进行MR构建的时候,父层数据是存储在磁盘上的,为了最大化地利用Spark的内存,父RDD在生成子RDD后需要从内存中释放RDD.unpersist(),而且每一层的RDD都会通过Spark提供的API持久化保存到HDFS上。
这样经过N+1层的迭代,就完成了所有Cuboid的计算。
图Spark构建4维Cube
如图所示是在用Spark构建Cube时的DAG(有向无环图)
图中详细地说明了Spark构建Cube的过程:
在“Stage 5”中,Kylin使用HiveContext读取中间临时表,然后执行map操作,这是一对一的映射,将原始值编码为KV类型,key 是维度值经编码后组成的 rowkey,value是度量编码后的二进制数据,这步操作完成后,Kylin将获得一个编码后的中间RDD。
在“Stage 6”中,对中间RDD进行reduceByKey操作,聚合后得到RDD-1,也就是Base Cuboid。接下来,在RDD-1的基础上执行“flatMap”算子,这是一对多的操作,因为 Base Cuboid 中有 N 个子 Cuboid 。然后循环执行这两个操作,直到把所有层的RDD全部计算完成。
使用Spark构建Cube
将介绍如何配置Spark引擎,并在HDP 2.4 Sandbox VM环境中使用samplecube演示如何使用新的构建引擎;然后会介绍如何开启Spark动态资源分配,使Spark根据Kylin的负载情况动态地增加或减少executor的数量,以最大限度地节省资源;最后介绍在使用Spark构建引擎的过程中的错误处理方法和常见问题的排查方法。
配置Spark引擎
要在YARN上运行Spark,需要指定HADOOP_CONF_DIR环境变量,该变量是包含Hadoop(客户端)配置文件的目录。在许多Hadoop发行版中,该目录是“/etc/hadoop/conf”,Kylin可以自动从Hadoop配置中检测到此文件夹,因此默认情况下不需要设置此属性,如果配置文件不在默认文件夹中,请明确设置此属性。
Kylin内嵌了一个解压后的Spark二进制包在“$KYLIN_HOME/spark”目录下,并且默认把这个目录作为SPARK_HOME,用户也可以使用自己环境中的SPARK_HOME环境变量,但是有可能出现版本不兼容的问题,所以建议使用Kylin目录下的Spark作为SPARK_HOME。所有的Spark相关配置都可以在“$KYLIN_HOME/conf/kylin.properties”文件中使用“kylin.engine.spark-conf.”前缀来进行管理,Kylin在提交Spark任务的时候,会提取这些配置属性并将其应用于提交的任务。
例如,在kylin.properties中配置了“kylin.engine.spark-conf.spark.executor.memory=4G”,那么Kylin在提交Spark任务的时候就会把“-conf.spark.executor.memory=4G”作为参数传送给“spark-submit”脚本。在运行Spark构建引擎之前,建议先检查Spark的相关配置,然后根据集群资源情况进行调整,以下是推荐的配置:
如果您的Kylin是运行在Hortonworks平台上,那么需要为YARN容器指定hdp.version作为JVM选项参数,因此需要在kylin.properties中打开后三行的注释,并将其中的HDP版本号替换为您自己的相应版本。此外,为了避免重复将Spark JAR上传到YARN,可以手动配置JAR在HDFS中的位置,注意HDFS的位置需要用完全限定的名称。示例如下:
这样,kylin.properties中的配置将变成以下格式:
并且所有的kylin.engine.spark-conf.参数均可在Cube级别或项目级别被替换,用户可以灵活、方便地使用。接下来,以sample cube为例,
演示如何使用Spark构建引擎。
首先运行sample.sh创建sample cube,然后启动Kylin。
示例如下
Kylin成功启动之后,进入Kylin的web界面,编辑“kylin_sales”这个cube,在“Advanced Setting”页面中把“Cube Engine”由“MapReduce”更改为“Spark”,
如图所示,
这样就完成了把构建引擎指定为Spark的操作。
设置Cube Engine点击“Next”按钮进入“Configuration Overwrites”页面,在这个页面中我们可以添加一些参数覆盖,通过点击“+Property”按钮来进行参数覆盖配置
然后点击“Next”→“Save”按钮完成Spark构建引擎配置
返回Cube页面,点击“Build”按钮,然后选择想要构建的时间段,这样就启动了一次新的构建任务,
在“Monitor”页面点击这个任务查看它的完成进度和状态,可以看到“Build Cube with Spark”的步骤
(如图所示),
该步骤使用Spark进行Cube的构建。不仅是这个步骤使用了Spark,在Kylin v2.5版本中实现了“Allin Spark”,也就是把构建过程中所有MapReduce的步骤都用Spark来完成,包括“Extract Fact Table Distinct Columns”“Convert Cuboid Data to HFile”等,这样可以极大地提高构建的速度。
查看构建任务的完成进度和状态当所有步骤都完成以后,这个Cube就变成了“Ready”状态,我们就可以对这个Cube进行正常的查询了。
开启Spark动态资源分配
在Spark中,所谓资源单位一般指的是executor,和YARN中的容器一样,在SparkOn YARN模式下,通常使用“num-executors”来指定应用程序使用的executor的数量,而executor-memory和executor-cores分别用于指定每个executor所使用的内存和虚拟CPU核数。以Spark Cubing这一步骤为例,如果使用的是固定的资源分配策略,在提交任务的时候指定“num-executors”为“3”,那么YARN会为这个Spark任务分配4个容器(一个固定用于application master,3个用于executor),这就出现了一个问题:不管构建的数据量多大,都会使用相同的资源,这样在数据量较小的时候就会出现资源浪费的情况,除非在每次构建前就知道这次构建的数据量大小,然后在Cube级别重写Spark任务相关的配置,这会很麻烦并且很多情况下用户并不知道数据量的大小。但是,如果把资源分配策略设置成动态分配的话,那么,Spark可以根据任务的负载情况动态地增加和减少executor的数量以减少资源的浪费。配置 Spark 动态资源分配需要在Spark的配置文件中添加一些配置项以开启该服务。由于在Kylin中可以通过在kylin.properties中进行配置来直接覆盖Spark中的配置,因此只需要在kylin.properties中进行以下配置:
以上配置只是一个示例,具体配置的值可以根据实际情况进行调整。更多的配置项可以参考Spark相关文档。
出错处理和问题排查
如果在使用Spark构建引擎的时候遇到构建失败的情况,可以在“$KYLIN_HOEM/logs/kylin.long”文件里找到Kylin提交Spark任务时的完整命令
。例如:
可以手动复制这个命令到命令行里,并且可以调整一下参数,然后运行这个命令,在执行过程中,你可以进入YARN resource manager检查运行任务的状态,如果这个任务执行完成,也可以进入Spark history server中检查这个任务的日志和信息。默认情况下,Kylin会把Spark history信息输出到“hdfs:///kylin/spark-history”目录下,你需要在该目录中启动“Spark history server”,或者使用已经启动好了的Spark history server,然后在“$KYLIN_HOME/conf/kylin.properties”中把参数“kylin.engine.spark-conf.spark.eventLog.dir”和“kylin.engine.spark-conf.spark.history.fs.logDirectory”的值指定为这个history server监听的目录。你可以运行以下命令来启动一个Spark history server,并且指定输出目录。在运行之前请确保停止了已经存在的Spark history server任务。
启动Spark history server之后在浏览器中访问“http://<hostname>:18080”即可查看Spark的历史任务信息,如图所示。
点击具体的任务名称即可查看任务运行时的详细信息,这些信息在进行问题排查和性能调优时是非常有帮助的。比如,在某些Hadoop的发行版本上,你可能会在“Convert Cuboid Data toHFile”这个步骤遇到类似以下错误:
该错误是由于Spark任务在运行时使用的HBase相关的jar包不正确导致的,
解决这个问题的方法是复制hbase-hadoop2-compat-*.jar和hbase-hadoop-compat-*.jar到“$KYLIN_HOME/spark/jars”目录下,这两个jar包可以在HBase的lib目录下找到,然后重新提交运行失败的任务,这个任务最终应该会执行成功。此问题相关的issue是KYLIN-3607,已经被修复。可见,Spark任务运行时的信息非常重要,如果在使用Spark构建引擎过程中出现错误,这些信息可以帮助你找到出现问题的根本原因。
使用Spark SQL创建中间平表
Kylin默认的数据源是Hive,构建Cube的第一步是创建Hive中间平表,也就是把事实表和维度表连接成一张平表,这个步骤是通过调用数据源的接口来完成的。以前使用Hive来执行构建平表的SQL,现在可以用Spark-SQL来执行。
接下来,介绍如何在Kylin里使用Spark SQL来创建平表。要启用这个功能需要以下几个步骤。
1)首先要确保以下配置已经存在于hive-site.xml中:
2)然后把Hive的执行引擎(hive.execution.engine)改成MapReduce。
3)hive-site.xml复制到“$SPARK_HOME/conf”目录下,并且确保环境变量“HADOOP_CONF_DIR”已经设置完毕。
4)使用“sbin/start-thriftserver.sh--master spark://<sparkmaster-host>:<port>”命令来启动thriftserver,通常端口是7077。
5)通过修改“$KYLIN_HOME/conf/kylin.properties”设置参数:
配置完成之后,重启Kylin使配置生效,这样Kylin在创建中间平表的时候就会使用Spark SQL来完成。
如果想要关闭这个功能,只需要把“kylin.source.hive.enable-sparksql-for-table-ops”重新设为“false”就可以了。
小结
Kylin v2.0版本中引入了Spark作为Cube的构建引擎,并且在Kylin v2.5版本中实现了完全使用Spark,显著提高了Kylin的构建速度,也为Kylin完整地运行于非Hadoop环境(如Spark on Kubernetes)提供了可能。
Spark的构建原理和如何配置Spark构建引擎,以及如何进行出错处理和问题排查等。推荐开启Spark的动态资源分配,让Spark根据负载自动增加或减少构建资源,因为Spark的性能主要依赖集群的Memory和CPU资源,假设要构建一个数据量很大又有复杂数据模型的Cube,如果没有给这个构建任务足够的资源,那么Sparkexecutors很可能会出现“OutOfMemorry”异常。对于那些有UHC维度,并且有很多维度组合,同时又包含非常占用内存的度量(如Count Distinct、Top_N)的Cube,建议使用MapReduce引擎进行构建,这会相对比较稳定。对于简单的Cube模型,如所有的度量都是“SUM/MIN/MAX/COUNT”且拥有中等规模的数据量,那么使用Spark构建引擎是一个很好的选择。
以上是关于使用Spark的主要内容,如果未能解决你的问题,请参考以下文章
手把手带你玩转Spark机器学习-使用Spark进行文本处理
使用 Java 的 Spark 和 Spark SQL 新手