使用 Spark Streaming 时限制 Kafka 批量大小

Posted

技术标签:

【中文标题】使用 Spark Streaming 时限制 Kafka 批量大小【英文标题】:Limit Kafka batches size when using Spark Streaming 【发布时间】:2017-02-20 06:07:03 【问题描述】:

是否可以限制 Kafka 消费者为 Spark Streaming 返回的批次大小?

我之所以问,是因为我得到的第一批记录有数亿条记录,并且需要很长时间来处理和检查它们。

【问题讨论】:

您当前的批处理间隔是多少?如果它更多尝试减少批处理间隔,那么您可以获得更少的数据。 忽略了你的问题,你提到了第一批.. 【参考方案1】:

限制最大批量大小有助于控制处理时间,但会增加消息的处理延迟。

通过属性下面的设置,我们可以控制批量大小 spark.streaming.receiver.maxRate= spark.streaming.kafka.maxRatePerPartition=

您甚至可以通过启用背压来根据处理时间动态设置批处理大小 spark.streaming.backpressure.enabled:真 spark.streaming.backpressure.initialRate:

【讨论】:

【参考方案2】:

除了上面的答案。批量大小是 3 个参数的乘积

    batchDuration:流数据分批的时间间隔(以秒为单位)。 spark.streaming.kafka.maxRatePerPartition:设置每个分区每秒的最大消息数。这与batchDuration 结合使用将控制批量大小。您希望将maxRatePerPartition 设置为很大(否则您实际上是在限制您的工作),而batchDuration 则非常小。 kafka 主题中的分区数

为了更好地解释此产品在启用/禁用背压时如何工作 (set spark.streaming.kafka.maxRatePerPartition for createDirectStream)

【讨论】:

这个答案比公认的答案更准确。 这真的很有意义。谢谢你的澄清。但我的情况略有不同。在我的工作中,我从多个主题中消费。所以,问题是:NumberOfTopics X NumberOfPartitions X MaxRatePerPartition X BatchDuration 我想设置这个最大值。将 Spark 2.4 与 Direct Kafka Stream 结合使用。【参考方案3】:

我认为Spark Streaming Backpressure可以解决您的问题。

检查spark.streaming.backpressure.enabledspark.streaming.backpressure.initialRate

默认情况下spark.streaming.backpressure.initialRate 未设置spark.streaming.backpressure.enabled 默认情况下禁用,所以我想 spark 会尽可能多地使用。

来自Apache Spark Kafka configuration

spark.streaming.backpressure.enabled:

这使 Spark Streaming 能够根据接收速率控制 关于当前的批处理调度延迟和处理时间,以便 系统只能以系统可以处理的速度接收。 在内部,这会动态设置最大接收速率 接收器。这个比率的上限是值 spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition 如果已设置(见下文)。

而且由于您想控制第一批,或者更具体地-第一批的消息数量,我认为您需要spark.streaming.backpressure.initialRate

spark.streaming.backpressure.initialRate:

这是每个接收器的初始最大接收速率 在背压机制启动时接收第一批数据 已启用。

当您的 Spark 作业(分别是 Spark 工作人员)能够处理来自 kafka 的 10000 条消息,但 kafka 代理向您的作业提供 100000 条消息时,这个很好。

也许您也有兴趣查看spark.streaming.kafka.maxRatePerPartition 以及Jeroen van Wilgenburg on his blog 对这些属性的真实示例的一些研究和建议。

【讨论】:

这就是我想要的,谢谢。不幸的是,spark.streaming.backpressure.initialRate、spark.streaming.backpressure.enabled、spark.streaming.receiver.maxRate 和 spark.streaming.receiver.initialRate 都没有改变我获得的记录数量(我尝试了许多不同的组合)。唯一有效的配置是“spark.streaming.kafka.maxRatePerPartition”。这总比没有好,但是启用背压以进行自动缩放会很有用。你知道为什么背压不起作用吗?如何调试这个? 也许 spark.streaming.backpressure.initialRate 可以工作,但正如 Jeroen van Wilgenburg 在他的博客中所注意到的那样“设置最大值是个好主意,因为背压算法不是即时的(这是不可能的)......麻烦当 Kafka 决定在最初几秒钟内为我们提供 50.000 条记录/秒时,使用 Kafka 输入的作业可以处理大约 1000 个事件/秒。” ..但我很困惑,因为没有工作。spark.streaming.backpressure.enabled 应该“在内部,动态设置接收器的最大接收速率” 在最新的流媒体文档中,它提到设置spark.streaming.backpressure.enabled 会动态处理费率。 “在 Spark 1.5 中,我们引入了一个称为背压的功能,它消除了设置此速率限制的需要,因为 Spark Streaming 会自动计算速率限制并在处理条件发生变化时动态调整它们。”,这可以解释为什么速率没有'如果该属性设置为 true,则不起作用。 我正在使用带有 createStream API 的 spark 1.6.1 我也无法利用 spark.streaming.backpressure.enabled=true 这对我也不起作用,唯一的设置对我有用是 spark.streaming.receiver.maxRate 这种方法适用于 Spark 结构化流吗?

以上是关于使用 Spark Streaming 时限制 Kafka 批量大小的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行

Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?

flink和spark Streaming中的Back Pressure

Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?

Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?