Spark Streaming实践和优化

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming实践和优化相关的知识,希望对你有一定的参考价值。

发表于:《程序员》杂志2016年2月刊。链接:http://geek.csdn.net/news/detail/54500

作者:徐鑫,董西成

在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎。其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎。如图1所示,Spark Streaming支持的数据源有很多,如Kafka、Flume、TCP等。Spark Streaming的内部数据表示形式为DStream(Discretized Stream,离散的数据流),其接口设计与RDD非常相似,这使得它对Spark用户非常友好。Spark Streaming的核心思想是把流式处理转化为“微批处理”,即以时间为单位切分数据流,每个切片内的数据对应一个RDD,进而可以采用Spark引擎进行快速计算。由于Spark Streaming采用了微批处理方式,因此严格来说只是一个近实时的处理系统,而不是真正的流式处理系统。

 

图1:Spark Streaming数据流

 

Storm是这个领域另一个著名的开源流式计算引擎,这是一个真正的流式处理系统,它每次从数据源读一条数据,然后单独处理。相比于Spark Streaming,Storm有更快速的响应时间(小于一秒),更适合低延迟的应用场景,比如信用卡欺诈系统,广告系统等。但是对比Storm,Spark Streaming的优势是吞吐量大,响应时间也可以接受(秒级),并且兼容Spark系统中的其他工具库,如MLlib和GraphX。从而,对于时间不敏感且流量很大的系统,Spark Streaming是更优的选择。

 

Spark Streaming在Hulu应用

 

Hulu是美国的专业在线视频网站,每天会有大量用户在线观看视频,进而产生大量用户观看的行为数据。这些数据通过收集系统进入Hulu的大数据平台,存储并做进一步处理。在大数据平台之上,各个团队会根据需要设计相应的算法对数据进行分析和挖掘以便产生商业价值:推荐团队从这些数据里挖掘出用户感兴趣的内容并做精准推荐,广告团队根据用户的历史行为推送最合适的广告,数据团队从数据的各个维度进行分析从而为公司的策略制定提供可靠依据。

Hulu大数据平台的实现依循Lambda架构。Lambda架构是一个通用的大数据处理框架,包含离线的批处理层、在线的加速层和服务层三部分,具体如图2所示。服务层一般使用HTTP服务或自定制的客户端对外提供数据访问,离线的批处理层一般使用批处理计算框架Spark和MapReduce进行数据分析,在线的加速层一般使用流式实时计算框架Spark Streaming和Storm进行数据分析。

 

图2:lambda架构原理图

对于实时计算部分,Hulu内部使用了Kafka、Codis和Spark Streaming。下面按照数据流的过程,介绍我们的项目。

 

  1. 1.     收集数据

 

从服务器日志中收集数据,主要包括两个部分:

q  来自网页、手机App、机顶盒等设备用户产生的视频观看、广告点击等行为,这些行为数据记录在各自的nginx服务的日志中。

q   使用Flume将用户行为数据同时导入HDFS和Kafka,其中HDFS中的数据用于离线分析,而Kafka中数据则用于流式实时分析。

 

图3:Hulu数据收集流程

  1. 2.     存储标签数据

 

Hulu使用HBase存储用户标签数据,包括基本信息如性别、年龄、是否付费,以及其他模型推测出来的偏好属性。这些属性需要作为计算模型的输入,同时HBase随机读取的速度比较慢,需要将数据同步到缓存服务器中以加快数据读取速度。Redis是一个应用广泛的开源缓存服务器,但其本身是个单机系统,不能很好地支持大量数据的缓存。为解决Redis扩展性差的问题,豌豆荚开源了Codis,一个分布式Redis解决方案。Hulu将Codis打成Docker镜像,并实现一键式构建缓存系统,附带自动监控和修复功能。为了更精细的监控,Hulu构建了多个Codis缓存,分别是:

q  codis-profile,同步HBase中的用户属性;

q  codis-action,缓存来自Kafka的用户行为;

q  codis-result,记录计算结果。

  1. 3.     实时处理数据

 

在一切准备就绪,启动Spark Streaming程序:

1)      Spark Streaming启动Kafka Receiver,持续地从Kafka服务器拉取数据;

2)      每隔两秒,Kafka的数据被整理成一个RDD,交给Spark引擎处理;

3)      对一条用户行为,Spark会从codis-action缓存中拿到该用户的行为记录,然后把新的行为追加进去;

4)      Spark从codis-action和codis-profile中获得该用户的所有相关属性,然后执行广告和推荐的计算模型,最后把结果写入codis-result,进而供服务层实时读取这些结果。

 

Spark Streaming优化经验

 

实践中,业务逻辑首先保证完成,使得在Kafka输入数据量较小的情况下系统稳定运行,且输入输出满足项目需求。然后开始调优,修改Spark Streaming的参数,比如Executor的数量,Core的数量,Receiver的流量等。最后发现仅调参数无法完全满足本项目的业务场景,所以有更进一步的优化方案,总结如下:

 

  1. Executor初始化

 

很多机器学习的模型在第一次运行时,需要执行初始化方法,还会连接外部的数据库,常常需要5-10分钟,这会成为潜在的不稳定因素。在Spark Streaming应用中,当Receiver完成初始化,它就开始源源不断地接收数据,并且由Driver定期调度任务消耗这些数据。如果刚启动时Executor需要几分钟做准备,会导致第一个作业一直没有完成,这段时间内 Driver不会调度新的作业。这时候在Kafka Receiver端会有数据积压,随着积压的数据量越来越大,大部分数据会撑过新生代进入老年代,进而给Java GC带来严重的压力,容易引发应用程序崩溃。

 

本项目的解决方案是,修改Spark内核,在每个Executor接收任务之前先执行一个用户自定义的初始化函数,初始化函数中可以执行一些独立的用户逻辑。示例代码如下:

  // sc:是SparkContext, setupEnvironment是Hulu扩展的API

  sc.setupEnvironment(() => {

    application.initialize() // 用户应用程序初始化,需执行几分钟

})

 

该方案需要更改Spark的任务调度器,首先将每个Executor设置为未初始化状态。此时,调度器只会给未初始化状态的Executor分配初始化任务(执行前面提到的初始化函数)。等初始化任务完毕,调度器更新Executor的状态为已初始化,这样的Executor才可以分配正常的计算任务。

 

  1. 异步处理Task中的业务逻辑

 

本项目中,模型的输入参数均来自Codis,甚至模型内部也可能访问外部存储,直接导致模型计算时长不稳定,很多时间消耗在网络等待上。

 

为提高系统吞吐量,增大并行度是常用的优化方案,但在本项目的场景中并不适用。Spark作业的调度策略是,等待上一个作业的所有Task执行完毕,然后调度下一个作业。如果单个Task的运行时间不稳定,易发生个别Task拖慢整个作业的情况,以至于资源利用率不高;甚至并行度越大,该问题越严重。一种常用解决Task不稳定的方案是增大Spark Streaming的micro batch的时间间隔,该方案会使整个实时系统的延迟变长,并不推荐。

 

因此这里通过异步处理Task中的业务逻辑来解决。如下文的代码所示,同步方案中,Task内执行业务逻辑,处理时间不定;异步方案中,Task把业务逻辑嵌入线程,交给线程池执行,Task立刻结束, Executor向Driver报告执行完毕,异步处理的时间非常短,在100ms以内。另外,当线程池中积压的线程数量太大时(代码中qsize>100的情况),会暂时使用同步处理,配合反压机制(见下文的参数spark.streaming.backpressure.enabled),可以保证不会因为数据积压过多而导致系统崩溃。经实验验证,该方案大大提高了系统的吞吐量。

  // 同步处理

  // 函数 runBusinessLogic是 Task 中的业务逻辑,执行时间不定

  rdd.foreachPartition(partition => runBusinessLogic (partition))

 

  // 异步处理,threadPool是线程池

  rdd.foreachPartition(partition => {

    val qsize = threadPool.getQueue.size // 线程池中积压的线程数

    if (qsize > 100) {

      runBusinessLogic(partition) // 暂时同步处理

    }

    threadPool.execute(new Runnable {

      override def run() = runBusinessLogic(partition)

    })

  })

 

异步化Task也存在缺点:如果Executor发生异常,存放在线程池中的业务逻辑无法重新计算,会导致部分数据丢失。经实验验证,仅当Executor异常崩溃时有数据丢失,且不常见,在本项目的场景中可以接受。

 

  1. Kafka Receiver的稳定性

 

本项目使用了Spark Streaming中的Kafka Receiver,本质上调用Kafka官方的客户端ZookeeperConsumerConnector。其策略是每个客户端在Zookeeper的固定路径下把自己注册为临时节点,于是所有客户端都知道其他客户端的存在,然后自动协调和分配Kafka的数据资源。该策略存在一个弊端,当一个客户端与Zookeeper的连接状态发生改变(断开或者连上),所有的客户端都会通过Zookeeper协调, 重新分配Kafka的数据资源;在此期间所有客户端都断开与Kafka的连接,系统接收不到Kafka的数据,直到重新分配成功。如果网络质量不佳,并且Receiver的个数较多,这种策略会造成数据输入不稳定,很多Spark Streaming用户遇到这样的问题。在我们的系统中,该策略并没有产生明显的负面影响。值得注意的是,Kafka 客户端与Zookeeper有个默认的参数zookeeper.session.timeout.ms=6000,表示客户端与Zookeeper连接的session有效时间为6秒,我们的客户端多次出现因为Full GC超过6秒而与Zookeeper断开连接,之后再次连接上,期间所有客户端都受到影响,系统表现不稳定。所以项目中设置参数zookeeper.session.timeout.ms=30000。

 

  1. YARN资源抢占问题

     

在Hulu内部,Spark Streaming这样的长时服务与MapRedue、Spark、Hive等批处理应用共享YARN集群资源。在共享环境中,经常因一个批处理应用占用大量网络资源或者CPU资源导致Spark Streaming服务不稳定(尽管我们采用了CGroup进行资源隔离,但效果不佳)。更严重的问题是,如果个别Container崩溃Driver需要向YARN申请新的Container,或者如果整个应用崩溃需要重启,Spark Streaming不能保证很快申请到足够的资源,也就无法保证线上服务的质量。为解决该问题,Hulu使用label-based scheduling的调度策略,从YARN集群中隔离出若干节点专门运行Spark Streaming和其他长时服务,避免与批处理程序竞争资源。

 

  1. 完善监控信息

 

监控反映系统运行的性能状态,也是一切优化的基础。 Hulu使用Graphite和Grafana作为第三方监控系统,本项目把系统中关键的性能参数(如计算时长和次数)发送给Graphite服务器,就能够在Grafana网页上看到直观的统计图。

 

 

图4:Graphite监控信息,展示了Kafka中日志的剩余数量,一条线对应于一个partition的历史余量

 

图4是统计Kafka中日志的剩余数量,一条线对应于一个partition的历史余量,大部分情况下余量接近零,符合预期。图中09:55左右日志余量开始出现很明显的尖峰,之后又迅速逼近零。事后经过多种数据核对,证实Kafka的数据一直稳定,而当时Spark Streaming执行作业突然变慢,反压机制生效,于是Kafka Receiver减小读取日志的速率,造成Kafka数据积压;一段时间之后Spark Streaming又恢复正常,快速消耗了Kafka中的数据余量。

 

直观的监控系统能有效地暴露问题,进而理解和强化系统。 在我们的实践中,主要的监控指标有:

q   Kafka的剩余数据量

q  Spark的作业运行时间和调度时间

q  每个Task的计算时间

q  Codis的访问次数、时间、命中率

另外,有脚本定期分析这些统计数据,出现异常则发邮件报警。比如图4中 Kafka 的日志余量过大时,会有连续的报警邮件。我们的经验是,监控越细致,之后的优化工作越轻松。

  1. 参数优化

下表列出本项目中比较关键的几个参数:

spark.yarn.max.executor.failures

Executor允许的失败上限;如果超过该上限,整个Spark Streaming会失败,需要设置比较大

spark.yarn.executor.memoryOverhead

Executor中JVM的开销,与堆内存不一样,设置太小会导致内存溢出异常

spark.receivers.num

Kafka Receiver的个数

spark.streaming.receiver.maxRate

每个Receiver能够接受数据的最大速率;这个值超过峰值约50%

spark.streaming.backpressure.enabled

反压机制;如果目前系统的延迟较长,Receiver端会自动减小接受数据的速率,避免系统因数据积压过多而崩溃

spark.locality.wait

系统调度Task会尽量考虑数据的局部性,如果超过spark.locality.wait设置时间的上限,就放弃局部性;该参数直接影响Task的调度时间

spark.cleaner.ttl

Spark系统内部的元信息的超时时间;Streaming长期运行,元信息累积太多会影响性能

 

总结

 

Spark Streaming的产品上线运行一年多,期间进行了多次Spark版本升级,从最早期的0.8版本到最近的 1.5.x版本。总体上Spark Streaming是一款优秀的实时计算框架,可以在线上使用 。但仍然存在一些不足,包括:Spark同时使用堆内和堆外的内存,缺乏有效的监控,遇到OOM时分析和调试比较困难;缺少Executor初始化接口; 新版本的Spark有一些异常,如Shuffle过程中Block丢失、内存溢出。

以上是关于Spark Streaming实践和优化的主要内容,如果未能解决你的问题,请参考以下文章

原创 Hadoop&Spark 动手实践 11Spark Streaming 应用与动手实践

Spark Streaming实时处理应用

Spark Streaming 基于 Direct API 优化与 Kafka 集成

Spark Streaming 基于 Direct API 优化与 Kafka 集成

Spark 以及 spark streaming 核心原理及实践

Spark 实践——基于 Spark Streaming 的实时日志分析系统