Spark Performance Tuning (性能调优)
Posted gyhuminyan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Performance Tuning (性能调优)相关的知识,希望对你有一定的参考价值。
在集群上的 Spark Streaming application 中获得最佳性能需要一些调整.本节介绍了可调整的多个 parameters (参数)和 configurations (配置)提高你的应用程序性能.在高层次上, 你需要考虑两件事情:
-
通过有效利用集群资源, Reducing the processing time of each batch of data (减少每批数据的处理时间).
-
设置正确的 batch size (批量大小), 以便 batches of data (批量的数据)可以像 received (被接收)处理一样快(即 data processing (数据处理)与 data ingestion (数据摄取)保持一致).
Reducing the Batch Processing Times (减少批处理时间)
在 Spark 中可以进行一些优化, 以 minimize the processing time of each batch (最小化每批处理时间).这些已在 Tuning Guide (调优指南) 中详细讨论过.本节突出了一些最重要的.
Level of Parallelism in Data Receiving (数据接收中的并行级别)
通过网络接收数据(如Kafka, Flume, socket 等)需要 deserialized (反序列化)数据并存储在 Spark 中.如果数据接收成为系统的瓶颈, 那么考虑一下 parallelizing the data receiving (并行化数据接收).注意每个 input DStream 创建接收 single stream of data (单个数据流)的 single receiver (单个接收器)(在 work machine 上运行). 因此, 可以通过创建多个 input DStreams 来实现 Receiving multiple data streams (接收多个数据流)并配置它们以从 source(s) 接收 data stream (数据流)的 different partitions (不同分区).例如, 接收 two topics of data (两个数据主题)的单个Kafka input DStream 可以分为两个 Kafka input streams (输入流), 每个只接收一个 topic (主题).这将运行两个 receivers (接收器), 允许 in parallel (并行)接收数据, 从而提高 overall throughput (总体吞吐量).这些 multiple DStreams 可以 unioned (联合起来)创建一个 single DStream .然后 transformations (转化)为应用于 single input DStream 可以应用于 unified stream .如下这样做.
1 val numStreams = 5 2 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } 3 val unifiedStream = streamingContext.union(kafkaStreams) 4 unifiedStream.print()
应考虑的另一个参数是 receiver’s block interval (接收器的块间隔), 这由configuration parameter (配置参数) 的 spark.streaming.blockInterval
决定.对于大多数 receivers (接收器), 接收到的数据 coalesced (合并)在一起存储在 Spark 内存之前的 blocks of data (数据块).每个 batch (批次)中的 blocks (块)数确定将用于处理接收到的数据以 map-like (类似与 map 形式的) transformation (转换)的 task (任务)的数量.每个 receiver (接收器)每 batch (批次)的任务数量将是大约( batch interval (批间隔)/ block interval (块间隔)).例如, 200 ms的 block interval (块间隔)每 2 秒 batches (批次)创建 10 个 tasks (任务).如果 tasks (任务)数量太少(即少于每个机器的内核数量), 那么它将无效, 因为所有可用的内核都不会被使用处理数据.要增加 given batch interval (给定批间隔)的 tasks (任务)数量, 请减少 block interval (块间??隔).但是, 推荐的 block interval (块间隔)最小值约为 50ms , 低于此任务启动开销可能是一个问题.
使用 multiple input streams (多个输入流)/ receivers (接收器)接收数据的替代方法是明确 repartition (重新分配) input data stream (输入数据流)(使用 inputStream.repartition(<number of partitions>)
). 这会在 further processing (进一步处理)之前将 received batches of data (收到的批次数据) distributes (分发)到集群中指定数量的计算机.
Level of Parallelism in Data Processing (数据处理中的并行度水平)
如果在任何 computation (计算)阶段中使用 number of parallel tasks (并行任务的数量), 则 Cluster resources (集群资源)可能未得到充分利用. 例如, 对于 distributed reduce (分布式 reduce)操作, 如 reduceByKey
和 reduceByKeyAndWindow
, 默认并行任务的数量由 spark.default.parallelism
configuration property 控制. 您 可以通过 parallelism (并行度)作为参数(见 PairDStreamFunctions
文档 ), 或设置 spark.default.parallelism
configuration property 更改默认值.
Data Serialization (数据序列化)
可以通过调优 serialization formats (序列化格式)来减少数据 serialization (序列化)的开销.在 streaming 的情况下, 有两种类型的数据被 serialized (序列化).
-
Input data (输入数据): 默认情况下, 通过 Receivers 接收的 input data (输入数据)通过 StorageLevel.MEMORY_AND_DISK_SER_2 存储在 executors 的内存中.也就是说, 将数据 serialized (序列化)为 bytes (字节)以减少 GC 开销, 并复制以容忍 executor failures (执行器故障).此外, 数据首先保留在内存中, 并且只有在内存不足以容纳 streaming computation (流计算)所需的所有输入数据时才会 spilled over (溢出)到磁盘.这个 serialization (序列化)显然具有开销 - receiver (接收器)必须使接收的数据 deserialize (反序列化), 并使用 Spark 的 serialization format (序列化格式)重新序列化它.
-
Persisted RDDs generated by Streaming Operations (流式操作生成的持久 RDDs): 通过 streaming computations (流式计算)生成的 RDD 可能会持久存储在内存中.例如, window operations (窗口操作)会将数据保留在内存中, 因为它们将被处理多次.但是, 与 StorageLevel.MEMORY_ONLY 的 Spark Core 默认情况不同, 通过流式计算生成的持久化 RDD 将以 StorageLevel.MEMORY_ONLY_SER (即序列化), 以最小化 GC 开销.
在这两种情况下, 使用 Kryo serialization (Kryo 序列化)可以减少 CPU 和内存开销.有关详细信息, 请参阅 Spark Tuning Guide .对于 Kryo , 请考虑 registering custom classes , 并禁用对象引用跟踪(请参阅 Configuration Guide 中的 Kryo 相关配置).
在 streaming application 需要保留的数据量不大的特定情况下, 可以将数据(两种类型)作为 deserialized objects (反序列化对象)持久化, 而不会导致过多的 GC 开销.例如, 如果您使用几秒钟的 batch intervals (批次间隔)并且没有 window operations (窗口操作), 那么可以通过明确地相应地设置 storage level (存储级别)来尝试禁用 serialization in persisted data (持久化数据中的序列化).这将减少由于序列化造成的 CPU 开销, 潜在地提高性能, 而不需要太多的 GC 开销.
Task Launching Overheads (任务启动开销)
如果每秒启动的任务数量很高(比如每秒 50 个或更多), 那么这个开销向 slaves 发送任务可能是重要的, 并且将难以实现 sub-second latencies (次要的延迟).可以通过以下更改减少开销:
- Execution mode (执行模式): 以 Standalone mode (独立模式)或 coarse-grained Mesos 模式运行 Spark 比 fine-grained Mesos 模式更好的任务启动时间.有关详细信息, 请参阅 Running on Mesos guide .
这些更改可能会将 batch processing time (批处理时间)缩短 100 毫秒, 从而允许 sub-second batch size (次秒批次大小)是可行的.
Setting the Right Batch Interval (设置正确的批次间隔)
对于在集群上稳定地运行的 Spark Streaming application, 该系统应该能够处理数据尽可能快地被接收.换句话说, 应该处理批次的数据就像生成它们一样快.这是否适用于 application 可以在 monitoring streaming web UI 中的 processing times 中被找到, processing time (批处理处理时间)应小于 batch interval (批间隔).
取决于 streaming computation (流式计算)的性质, 使用的 batch interval (批次间隔)可能对处理由应用程序持续一组固定的 cluster resources (集群资源)的数据速率有重大的影响.例如, 让我们考虑早期的 WordCountNetwork 示例.对于特定的 data rate (数据速率), 系统可能能够跟踪每 2 秒报告 word counts (即 2 秒的 batch interval (批次间隔)), 但不能每 500 毫秒.因此, 需要设置 batch interval (批次间隔), 使预期的数据速率在生产可以持续.
为您的应用程序找出正确的 batch size (批量大小)的一个好方法是使用进行测试 conservative batch interval (保守的批次间隔)(例如 5-10 秒)和 low data rate (低数据速率).验证是否系统能够跟上 data rate (数据速率), 可以检查遇到的 end-to-end delay (端到端延迟)的值通过每个 processed batch (处理的批次)(在 Spark driver log4j 日志中查找 “Total delay” , 或使用 StreamingListener 接口). 如果 delay (延迟)保持与 batch size (批量大小)相当, 那么系统是稳定的.除此以外, 如果延迟不断增加, 则意味着系统无法跟上, 因此不稳定.一旦你有一个 stable configuration (稳定的配置)的想法, 你可以尝试增加 data rate and/or 减少 batch size .请注意, momentary increase (瞬时增加)由于延迟暂时增加只要延迟降低到 low value (低值), 临时数据速率增加就可以很好(即, 小于 batch size (批量大小)).
以上是关于Spark Performance Tuning (性能调优)的主要内容,如果未能解决你的问题,请参考以下文章
[转]Columnstore Performance Tuning
Oracle Performance Tuning Tools