使用 Spark Structured Streaming 时限制 kafka 批量大小

Posted

技术标签:

【中文标题】使用 Spark Structured Streaming 时限制 kafka 批量大小【英文标题】:Limit kafka batch size when using Spark Structured Streaming 【发布时间】:2019-03-28 23:28:37 【问题描述】:

我们有一些关于我们主题的历史数据排队,我们不希望在一个批次中处理所有这些数据,因为这样做更难(如果失败,就必须重新开始!)。

此外,了解如何控制批量大小对于调整作业非常有帮助。

使用DStreams时,尽可能精确控制batch大小的方法是Limit Kafka batches size when using Spark Streaming

同样的方法,即设置maxRatePerPartition,然后调整batchDuration 非常麻烦,但与DStream 一起使用,它根本不适用于结构化流。

理想情况下,我想知道像 maxBatchSizeminBatchSize 这样的配置,我可以在其中简单地设置我想要的记录数。

【问题讨论】:

【参考方案1】:

这个配置选项maxOffsetsPerTrigger:

每个触发间隔处理的最大偏移数的速率限制。指定的总偏移量将按比例分配到不同卷的 topicPartitions。

注意如果你有一个带有开始和结束偏移量的检查点目录,那么应用程序将处理第一批目录中的偏移量,从而忽略这个配置。 (下一批会尊重它)。

【讨论】:

这会扼杀我的工作吗?究竟什么是触发间隔?它会尽快从 Kafka 中读取数据,但只限制读取的记录数吗? 可以改用它,并自己处理偏移量,这将比 StructureStreaming 更可预测/更灵活。 ***.com/a/53065951/1586965 @samthebest 这可以很好地限制batchSize。应该使用什么选项来限制/控制trigger-frequency? (类似于 Spark Streaming 中的 Duration.class)。 @CᴴᴀZ 您正在寻找的概念是trigger,特别是固定间隔微批次【参考方案2】:

如果主题是分区的,并且所有分区都有消息,那么你可以获取的最少消息等于主题中的分区数。 (即)如果有数据,每个分区需要 1 条记录,如果只有一个分区有数据,那么您可以采取的最小记录是 1。如果主题没有分区,您可以采取 1 条记录最小值和任何最大值。

【讨论】:

以上是关于使用 Spark Structured Streaming 时限制 kafka 批量大小的主要内容,如果未能解决你的问题,请参考以下文章

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录

Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID

如何使用 Python 在 Spark Structured Streaming 中查看特定指标

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

Spark 2.1 Structured Streaming - 使用 Kakfa 作为 Python 的源 (pyspark)