如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?
Posted
技术标签:
【中文标题】如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?【英文标题】:How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState? 【发布时间】:2018-05-05 12:46:40 【问题描述】:如何在不使用 flatMapsGroupWithState 或 Dstream API 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?寻找一种更具声明性的方式
例子:
select count(*) from some_view
我希望输出只计算每批中可用的任何记录,但不计算前一批的汇总
【问题讨论】:
【参考方案1】:要在 spark 中使用 Structured Streaming 2.3.0 而不使用 flatMapsGroupWithState
或 Dstream API 进行无状态聚合,您可以使用以下代码-
import spark.implicits._
def countValues = (_: String, it: Iterator[(String, String)]) => it.length
val query =
dataStream
.select(lit("a").as("newKey"), col("value"))
.as[(String, String)]
.groupByKey case(newKey, _) => newKey
.mapGroups[Int](countValues)
.writeStream
.format("console")
.start()
我们正在做的是-
-
我们在
datastream
- newKey
中添加了一列。我们这样做是为了可以使用groupByKey
对其进行groupBy
。我使用了文字字符串"a"
,但你可以使用任何东西。此外,您需要从datastream
的可用列中选择任何列。我为此选择了value
列,您可以选择任何人。
我们创建了一个映射函数 - countValues
,通过编写 it.length
来计算 groupByKey
函数聚合的值。
因此,通过这种方式,我们可以计算每批中可用的任何记录,但不是从前一批中汇总的记录。
希望对你有帮助!
【讨论】:
我正在寻找问题中所述的声明方式,因此我尝试使用原始 sql 字符串解决问题,这隐含意味着没有映射函数,除非它们可以用作原始 SQL 的一部分! @user1870400 我不熟悉任何声明方式。 如果你选择文字“a”,整个流不会变成一个组吗? @user1870400 是的。 有没有办法在 mapGroups 中创建一个静态数据框?鉴于 mapGroups 给出了行的迭代器。我只想使用该迭代器并填充一个静态数据框。这可能吗?以上是关于如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?的主要内容,如果未能解决你的问题,请参考以下文章
如何在不更改链接结构的情况下使用 \ 转义字符 (、)、[、]、*、_、:[]()
如何在不使用 sleep() 的情况下使用 ontimer 函数延迟进程?