使用 Spark Structured Streaming 时限制 kafka 批量大小
Posted
技术标签:
【中文标题】使用 Spark Structured Streaming 时限制 kafka 批量大小【英文标题】:Limit kafka batch size when using Spark Structured Streaming 【发布时间】:2019-03-28 23:28:37 【问题描述】:我们有一些关于我们主题的历史数据排队,我们不希望在一个批次中处理所有这些数据,因为这样做更难(如果失败,就必须重新开始!)。
此外,了解如何控制批量大小对于调整作业非常有帮助。
使用DStreams
时,尽可能精确控制batch大小的方法是Limit Kafka batches size when using Spark Streaming
同样的方法,即设置maxRatePerPartition
,然后调整batchDuration
非常麻烦,但与DStream
一起使用,它根本不适用于结构化流。
理想情况下,我想知道像 maxBatchSize
和 minBatchSize
这样的配置,我可以在其中简单地设置我想要的记录数。
【问题讨论】:
【参考方案1】:这个配置选项maxOffsetsPerTrigger
:
每个触发间隔处理的最大偏移数的速率限制。指定的总偏移量将按比例分配到不同卷的 topicPartitions。
注意如果你有一个带有开始和结束偏移量的检查点目录,那么应用程序将处理第一批目录中的偏移量,从而忽略这个配置。 (下一批会尊重它)。
【讨论】:
这会扼杀我的工作吗?究竟什么是触发间隔?它会尽快从 Kafka 中读取数据,但只限制读取的记录数吗? 可以改用它,并自己处理偏移量,这将比 StructureStreaming 更可预测/更灵活。 ***.com/a/53065951/1586965 @samthebest 这可以很好地限制batchSize
。应该使用什么选项来限制/控制trigger-frequency
? (类似于 Spark Streaming 中的 Duration.class
)。
@CᴴᴀZ 您正在寻找的概念是trigger,特别是固定间隔微批次。【参考方案2】:
如果主题是分区的,并且所有分区都有消息,那么你可以获取的最少消息等于主题中的分区数。 (即)如果有数据,每个分区需要 1 条记录,如果只有一个分区有数据,那么您可以采取的最小记录是 1。如果主题没有分区,您可以采取 1 条记录最小值和任何最大值。
【讨论】:
以上是关于使用 Spark Structured Streaming 时限制 kafka 批量大小的主要内容,如果未能解决你的问题,请参考以下文章
无法使用Spark Structured Streaming在Parquet文件中写入数据
如何使用Spark Structured Streaming连续监视目录
Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID
如何使用 Python 在 Spark Structured Streaming 中查看特定指标
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?
Spark 2.1 Structured Streaming - 使用 Kakfa 作为 Python 的源 (pyspark)