如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?
Posted
技术标签:
【中文标题】如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?【英文标题】:How to use maxOffsetsPerTrigger in pyspark structured streaming? 【发布时间】:2018-12-04 14:40:11 【问题描述】:我想限制从 kafka 获取数据时的速率。我的代码如下:
df = spark.read.format('kafka') \
.option("kafka.bootstrap.servers",'...')\
.option("subscribe",'A') \
.option("startingOffsets",'''"A":"0":200,"1":200,"2":200''') \
.option("endingOffsets",'''"A":"0":400,"1":400,"2":400''') \
.option("maxOffsetsPerTrigger",20) \
.load() \
.cache()
但是当我调用df.count()
时,结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。
【问题讨论】:
我看到工作正常。您看到的最终结果总共有多少个文件? 【参考方案1】:您为每个分区(0、1、2)带来 200 条记录,总计 600 条记录。
如您所见:
使用 maxOffsetsPerTrigger 选项将记录数限制为 获取每个触发器。
这意味着对于每个触发器或获取过程,Kafka 将获取 20 条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区 200 条)。
【讨论】:
感谢您的回复!那么它会触发 30 次来获取 600 条记录吗?当我将endingOffsets设置为最新时,有什么方法可以限制我可以获得多少记录,同时总记录不超过某个限制? 是的,没错!通常,您只能使用max.poll.records
限制从 fetch/poll 操作中获取的记录数,如果您更喜欢使用字节,请使用 replica.fetch.max.bytes
或 replica.fetch.min.bytes
。
设置max.poll.records
和maxOffsetsPerTrigger
有什么区别?
我使用 maxOffsetsPerTrigger 作为 1,我的主题有 10 个分区。我的执行器数量为 4,执行器核心数为 2。当我的作业运行时,它每批获取 8 条记录,并为每个执行器分配 2 条消息。知道为什么吗?以上是关于如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 结构化流式处理:将查询的输出传递到 API 端点