在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务

Posted

技术标签:

【中文标题】在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务【英文标题】:In Spark structured streaming how do I output complete aggregations to an external source like a REST service 【发布时间】:2018-05-20 18:45:44 【问题描述】:

我正在尝试执行的任务是聚合 DataFrame 中维度(字段)的值计数,执行一些统计信息,如平均值、最大值、最小值等,然后通过制作 API 将聚合输出到外部系统称呼。我正在使用 30 秒的水印,窗口大小为 10 秒。我把这些尺寸缩小了,以便我更容易测试和调试系统。

我发现进行 API 调用的唯一方法是使用ForeachWriter。我的问题是ForeachWriter 在分区级别执行,并且只为每个分区生成一个聚合。到目前为止,除了合并到 1 之外,我还没有找到一种方法来获取汇总的聚合,这是一种减慢我的流式应用程序的方法。

我发现如果我使用基于文件的接收器,例如 HDFS 的 Parquet 写入器,代码会产生真正的聚合。它的表现也非常好。我真正需要的是实现同样的结果,但调用 API 而不是写入文件系统。

有人知道怎么做吗?

我已经在 Spark 2.2.2 和 Spark 2.3 上进行了尝试,并获得了相同的行为。

这是一个简化的代码片段来说明我想要做什么:

val valStream = streamingDF
  .select(
    $"event.name".alias("eventName"),
    expr("event.clientTimestamp / 1000").cast("timestamp").as("eventTime"),
    $"asset.assetClass").alias("assetClass")
  .where($"eventName" === 'MyEvent')
  .withWatermark("eventTime", "30 seconds")
  .groupBy(window($"eventTime", "10 seconds", $"assetClass", $"eventName")
  .agg(count($"eventName").as("eventCount"))
  .select($"window.start".as("windowStart"), $"window.end".as("windowEnd"), $"assetClass".as("metric"), $"eventCount").as[DimAggregateRecord]
  .writeStream
  .option("checkpointLocation", config.checkpointPath)
  .outputMode(config.outputMode)

val session = (if(config.writeStreamType == AbacusStreamWriterFactory.S3) 
    valStream.format(config.outputFormat)
    .option("path", config.outputPath)
  
  else 
    valStream.foreach(--- this is my DimAggregateRecord ForEachWriter ---)
  ).start()

【问题讨论】:

【参考方案1】:

我回答了我自己的问题。我发现按窗口开始时间重新分区可以解决问题。它对数据进行打乱,以便具有相同组和 windowStart 时间的所有行都在同一个执行程序上。下面的代码为每个组窗口间隔生成一个文件。它的表现也相当不错。我没有确切的数字,但它在比 10 秒的窗口间隔更短的时间内产生聚合。

val valStream = streamingDF
  .select(
    $"event.name".alias("eventName"),
    expr("event.clientTimestamp / 1000").cast("timestamp").as("eventTime"),
    $"asset.assetClass").alias("assetClass")
  .where($"eventName" === 'MyEvent')
  .withWatermark("eventTime", "30 seconds")
  .groupBy(window($"eventTime", "10 seconds", $"assetClass", $"eventName")
  .agg(count($"eventName").as("eventCount"))
  .select($"window.start".as("windowStart"), $"window.end".as("windowEnd"), $"assetClass".as("metric"), $"eventCount").as[DimAggregateRecord]

  .repartition($"windowStart")  // <-------- this line produces the desired result

  .writeStream
  .option("checkpointLocation", config.checkpointPath)
  .outputMode(config.outputMode)

val session = (if(config.writeStreamType == AbacusStreamWriterFactory.S3) 
    valStream.format(config.outputFormat)
    .option("path", config.outputPath)
  
  else 
    valStream.foreach(--- this is my DimAggregateRecord ForEachWriter ---)
  ).start()

【讨论】:

以上是关于在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务的主要内容,如果未能解决你的问题,请参考以下文章

Spark 结构化流中的外部连接

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

如何在 Spark 结构化流中保存通过水印丢弃的记录

有没有办法将生成的 groupby 流加入到 kafka-spark 结构化流中的原始流?

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

如何从Spark中的聚合结构对象中删除“ col1”别名?