使用 Spark Streaming 时限制 Kafka 批量大小
Posted
技术标签:
【中文标题】使用 Spark Streaming 时限制 Kafka 批量大小【英文标题】:Limit Kafka batches size when using Spark Streaming 【发布时间】:2017-02-20 06:07:03 【问题描述】:是否可以限制 Kafka 消费者为 Spark Streaming 返回的批次大小?
我之所以问,是因为我得到的第一批记录有数亿条记录,并且需要很长时间来处理和检查它们。
【问题讨论】:
您当前的批处理间隔是多少?如果它更多尝试减少批处理间隔,那么您可以获得更少的数据。 忽略了你的问题,你提到了第一批.. 【参考方案1】:限制最大批量大小有助于控制处理时间,但会增加消息的处理延迟。
通过属性下面的设置,我们可以控制批量大小 spark.streaming.receiver.maxRate= spark.streaming.kafka.maxRatePerPartition=
您甚至可以通过启用背压来根据处理时间动态设置批处理大小 spark.streaming.backpressure.enabled:真 spark.streaming.backpressure.initialRate:
【讨论】:
【参考方案2】:除了上面的答案。批量大小是 3 个参数的乘积
batchDuration
:流数据分批的时间间隔(以秒为单位)。
spark.streaming.kafka.maxRatePerPartition
:设置每个分区每秒的最大消息数。这与batchDuration
结合使用将控制批量大小。您希望将maxRatePerPartition
设置为很大(否则您实际上是在限制您的工作),而batchDuration
则非常小。
kafka 主题中的分区数
为了更好地解释此产品在启用/禁用背压时如何工作 (set spark.streaming.kafka.maxRatePerPartition for createDirectStream)
【讨论】:
这个答案比公认的答案更准确。 这真的很有意义。谢谢你的澄清。但我的情况略有不同。在我的工作中,我从多个主题中消费。所以,问题是:NumberOfTopics X NumberOfPartitions X MaxRatePerPartition X BatchDuration 我想设置这个最大值。将 Spark 2.4 与 Direct Kafka Stream 结合使用。【参考方案3】:我认为Spark Streaming Backpressure可以解决您的问题。
检查spark.streaming.backpressure.enabled
和spark.streaming.backpressure.initialRate
。
默认情况下spark.streaming.backpressure.initialRate
未设置,spark.streaming.backpressure.enabled
默认情况下禁用,所以我想 spark 会尽可能多地使用。
来自Apache Spark Kafka configuration
spark.streaming.backpressure.enabled
:
这使 Spark Streaming 能够根据接收速率控制 关于当前的批处理调度延迟和处理时间,以便 系统只能以系统可以处理的速度接收。 在内部,这会动态设置最大接收速率 接收器。这个比率的上限是值
spark.streaming.receiver.maxRate
和spark.streaming.kafka.maxRatePerPartition
如果已设置(见下文)。
而且由于您想控制第一批,或者更具体地-第一批的消息数量,我认为您需要spark.streaming.backpressure.initialRate
spark.streaming.backpressure.initialRate
:
这是每个接收器的初始最大接收速率 在背压机制启动时接收第一批数据 已启用。
当您的 Spark 作业(分别是 Spark 工作人员)能够处理来自 kafka 的 10000 条消息,但 kafka 代理向您的作业提供 100000 条消息时,这个很好。
也许您也有兴趣查看spark.streaming.kafka.maxRatePerPartition
以及Jeroen van Wilgenburg on his blog 对这些属性的真实示例的一些研究和建议。
【讨论】:
这就是我想要的,谢谢。不幸的是,spark.streaming.backpressure.initialRate、spark.streaming.backpressure.enabled、spark.streaming.receiver.maxRate 和 spark.streaming.receiver.initialRate 都没有改变我获得的记录数量(我尝试了许多不同的组合)。唯一有效的配置是“spark.streaming.kafka.maxRatePerPartition”。这总比没有好,但是启用背压以进行自动缩放会很有用。你知道为什么背压不起作用吗?如何调试这个? 也许spark.streaming.backpressure.initialRate
可以工作,但正如 Jeroen van Wilgenburg 在他的博客中所注意到的那样“设置最大值是个好主意,因为背压算法不是即时的(这是不可能的)......麻烦当 Kafka 决定在最初几秒钟内为我们提供 50.000 条记录/秒时,使用 Kafka 输入的作业可以处理大约 1000 个事件/秒。” ..但我很困惑,因为没有工作。spark.streaming.backpressure.enabled
应该“在内部,动态设置接收器的最大接收速率”
在最新的流媒体文档中,它提到设置spark.streaming.backpressure.enabled
会动态处理费率。 “在 Spark 1.5 中,我们引入了一个称为背压的功能,它消除了设置此速率限制的需要,因为 Spark Streaming 会自动计算速率限制并在处理条件发生变化时动态调整它们。”,这可以解释为什么速率没有'如果该属性设置为 true,则不起作用。
我正在使用带有 createStream API 的 spark 1.6.1 我也无法利用 spark.streaming.backpressure.enabled=true 这对我也不起作用,唯一的设置对我有用是 spark.streaming.receiver.maxRate
这种方法适用于 Spark 结构化流吗?以上是关于使用 Spark Streaming 时限制 Kafka 批量大小的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行
Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?
flink和spark Streaming中的Back Pressure
Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?