如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

Posted

技术标签:

【中文标题】如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?【英文标题】:How to use maxOffsetsPerTrigger in pyspark structured streaming? 【发布时间】:2018-12-04 14:40:11 【问题描述】:

我想限制从 kafka 获取数据时的速率。我的代码如下:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''"A":"0":200,"1":200,"2":200''') \
        .option("endingOffsets",'''"A":"0":400,"1":400,"2":400''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()

但是当我调用df.count() 时,结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。

【问题讨论】:

我看到工作正常。您看到的最终结果总共有多少个文件? 【参考方案1】:

您为每个分区(0、1、2)带来 200 条记录,总计 600 条记录。

如您所见:

使用 maxOffsetsPerTrigger 选项将记录数限制为 获取每个触发器。

这意味着对于每个触发器或获取过程,Kafka 将获取 20 条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区 200 条)。

【讨论】:

感谢您的回复!那么它会触发 30 次来获取 600 条记录吗?当我将endingOffsets设置为最新时,有什么方法可以限制我可以获得多少记录,同时总记录不超过某个限制? 是的,没错!通常,您只能使用 max.poll.records 限制从 fetch/poll 操作中获取的记录数,如果您更喜欢使用字节,请使用 replica.fetch.max.bytesreplica.fetch.min.bytes 设置max.poll.recordsmaxOffsetsPerTrigger有什么区别? 我使用 maxOffsetsPerTrigger 作为 1,我的主题有 10 个分区。我的执行器数量为 4,执行器核心数为 2。当我的作业运行时,它每批获取 8 条记录,并为每个执行器分配 2 条消息。知道为什么吗?

以上是关于如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 结构化流式处理:将查询的输出传递到 API 端点

如何在 Spark 结构化流中获取书面记录的数量?

如何在 Spark 结构化流中保存通过水印丢弃的记录

如何检查结构化流中的 StreamingQuery 性能指标?

如何在火花结构化流式读取流中倒带 Kafka 偏移

如何在 pyspark 的结构化流作业中运行地图转换