如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?

Posted

技术标签:

【中文标题】如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?【英文标题】:How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState? 【发布时间】:2018-05-05 12:46:40 【问题描述】:

如何在不使用 flatMapsGroupWithState 或 Dstream API 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?寻找一种更具声明性的方式

例子:

select count(*) from some_view

我希望输出只计算每批中可用的任何记录,但不计算前一批的汇总

【问题讨论】:

【参考方案1】:

要在 spark 中使用 Structured Streaming 2.3.0 而不使用 flatMapsGroupWithState 或 Dstream API 进行无状态聚合,您可以使用以下代码-

import spark.implicits._

def countValues = (_: String, it: Iterator[(String, String)]) => it.length

val query =
  dataStream
    .select(lit("a").as("newKey"), col("value"))
    .as[(String, String)]
    .groupByKey  case(newKey, _) => newKey 
    .mapGroups[Int](countValues)
    .writeStream
    .format("console")
    .start()

我们正在做的是-

    我们在datastream - newKey 中添加了一列。我们这样做是为了可以使用groupByKey 对其进行groupBy。我使用了文字字符串"a",但你可以使用任何东西。此外,您需要从datastream 的可用列中选择任何列。我为此选择了value 列,您可以选择任何人。 我们创建了一个映射函数 - countValues,通过编写 it.length 来计算 groupByKey 函数聚合的值。

因此,通过这种方式,我们可以计算每批中可用的任何记录,但不是从前一批中汇总的记录。

希望对你有帮助!

【讨论】:

我正在寻找问题中所述的声明方式,因此我尝试使用原始 sql 字符串解决问题,这隐含意味着没有映射函数,除非它们可以用作原始 SQL 的一部分! @user1870400 我不熟悉任何声明方式。 如果你选择文字“a”,整个流不会变成一个组吗? @user1870400 是的。 有没有办法在 mapGroups 中创建一个静态数据框?鉴于 mapGroups 给出了行的迭代器。我只想使用该迭代器并填充一个静态数据框。这可能吗?

以上是关于如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?的主要内容,如果未能解决你的问题,请参考以下文章

如何在不更改链接结构的情况下使用 \ 转义字符 (、)、[、]、*、_、:[]()

如何在不使用 sleep() 的情况下使用 ontimer 函数延迟进程?

如何在不安装的情况下使用数据库?

如何在不使用 Flexbox 的情况下水平对齐元素?

如何在不使用 DESCRIBE 子句的情况下描述 ORACLE 包?

如何在不使用滤镜的情况下使图像变暗? [复制]