如何在 Spark 结构化流中获取书面记录的数量?

Posted

技术标签:

【中文标题】如何在 Spark 结构化流中获取书面记录的数量?【英文标题】:How to get number of written records in spark structured streaming? 【发布时间】:2018-02-01 14:43:46 【问题描述】:

我在单个 Spark 会话中配置了一些结构化流。我需要知道在每个流中读取和写入了多少条记录。 例如,我有这两个流:

    read-s3 -> 转换 -> write-s3 read-s3 -> 转换 -> write-db

我知道使用 SparkListener().onTaskEnd(),但那时我没有查询名称,taskEnd.taskMetrics().outputMetrics().recordsWritten() 始终为 0,所以它不是一个选项。

另一种方法是使用 dataset.map() 中的累加器进行增量计算。但这不是要写入的记录数,而是要写入的记录(如果 sink 不失败)。

除了我尝试使用 StreamingQueryListener(我用它来获取 numInputRows)之外,我找不到任何关于写入记录数量的指标。

是否有可能获得这种指标?

【问题讨论】:

还是custom sink吗? 没有。那篇文章是关于修复一些输入行的,但在这里我问的是输出记录的数量,使用哪个接收器并不重要。当然,有了自定义接收器,我可以自己添加这些指标,但如果我使用 FileStreamSink 该怎么办? FileStreamSink (issues.apache.org/jira/browse/SPARK-23288) 中存在一个错误,已在 2.3.1 版中修复 请将其作为答案发布 (***.com/help/self-answer)。它将对未来的访问者更加可见,并有助于解决问题。 【参考方案1】:

a bug in FileStreamSink 已在 2.3.1 版中修复。

作为一种解决方法,在 map 函数中使用 accumulators 在写入接收器之前计算记录数。

【讨论】:

以上是关于如何在 Spark 结构化流中获取书面记录的数量?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

Spark-Streaming 记录比较

如何在spark结构化流媒体应用程序中优化执行程序实例的数量?