为 spark kafka 消费者动态更新输入的批量大小

Posted

技术标签:

【中文标题】为 spark kafka 消费者动态更新输入的批量大小【英文标题】:Dynamically update batch size of input for spark kafka consumer 【发布时间】:2016-12-19 11:55:04 【问题描述】:

我在我的 spark 流应用程序中使用 createDirectStream。我将批处理间隔设置为 7 秒,大多数情况下批处理作业可以在大约 5 秒内完成。但是,在极少数情况下,批处理作业需要花费 60 秒,这会延迟某些批处理作业。 为了减少总延迟时间,我希望我可以一次处理更多分布在延迟作业中的流数据。这将有助于流媒体尽快恢复正常。

所以,我想知道有一些方法可以在出现延迟时动态更新/合并 spark 和 kafka 的输入批量大小。

【问题讨论】:

【参考方案1】:

您可以将“spark.streaming.backpressure.enabled”选项设置为 true。

如果在背压选项为 true 时出现批处理延迟,它最初以较小的批处理大小开始,然后动态更改为较大的批处理大小。

See the spark configuration document.

你可以看下面的描述。

启用或禁用 Spark Streaming 的内部背压机制 (从 1.5 开始)。这使 Spark Streaming 能够控制接收 基于当前批处理调度延迟和处理时间的速率 以便系统仅以系统可以处理的速度接收。 在内部,这会动态设置最大接收速率 接收器。这个比率的上限是值 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition(如果已设置)(见下文)。

【讨论】:

以上是关于为 spark kafka 消费者动态更新输入的批量大小的主要内容,如果未能解决你的问题,请参考以下文章

spark-streaming与kafka的整合

请教一个关于使用spark 读取kafka只能读取一个分区数据的问题

[Spark][kafka]kafka 生产者,消费者 互动例子

kafka unclean 配置代表啥,会对 spark streaming 消费有什么影响?

kafka unclean 配置代表啥,会对 spark streaming 消费有什么影响?

用canal同步binlog到kafka,spark streaming消费kafka topic乱码问题