在 DataFlow 中获取以前的窗口数据

Posted

技术标签:

【中文标题】在 DataFlow 中获取以前的窗口数据【英文标题】:Getting previous window data in DataFlow 【发布时间】:2017-12-17 11:27:20 【问题描述】:

试图创建一些警报系统机制,我希望找到两个窗口之间的平均值下降。

我很高兴找到TrafficRoutes 的例子,特别是当我看到它说:

如果滑动窗口中的绝大多数速度发生了“减速” 小于上一个窗口的读数

我查看了code,但不明白为什么这意味着我们从前一个窗口中获取了前一个值。由于到目前为止我还没有使用滑动窗口的经验,所以我想我可能会遗漏一些东西。

实现这种机制,无论是否有滑动窗口 - 不会像我怀疑的那样从以前的窗口获取数据。

知道我想念什么吗? 是否有某种方法可以从上一个窗口中获取值?

我正在使用 SDK 1.9.0 在 GCP Dataflow 上执行。

请指教,

舒舒

【问题讨论】:

嗯,您帖子中的引用与代码中的文本不匹配 - 它说“如果此滑动窗口中的绝大多数速度低于之前的读数,我们称之为‘减速’” ,没有提到“上一个窗口”。该代码通过处理给定路线上所有站点的所有速度测量值的完整列表(按时间戳排序)来查看特定站点的先前速度测量值。 一般来说,“上一个窗口”的概念没有明确定义:例如如果一个键有 3 个窗口 [1, 3]、[2, 3] 和 [1, 2],哪个是 [2,3] 的“前一个”?此外,所有窗口都是同时处理的,不一定按任何特定顺序。您可以将窗口简单地视为聚合/分组操作(例如 GroupByKey 和 Combine)的另一个隐式分组键。 我认为仍然有一种合理的方法可以实现您的原始目标,即使 TrafficRoutes 不是模仿的最佳示例。让我把它写在答案中...... 谢谢。引用来自cloud.google.com/dataflow/examples/all-examples#traffic-routes 【参考方案1】:

我的假设:

您的警报系统已将数据划分为由“指标 ID”标识的“指标”。 指标在给定时间的值为Double。 您以PCollection<KV<String, Double>> 形式接收度量数据,其中String 是度量ID,Double 是度量值,并且每个元素都有适当的隐式时间戳(如果没有,您可以分配一个使用WithTimestamps 转换)。 您希望从每 1 分钟开始计算每个 5 分钟间隔的每个指标的滑动平均值,并希望在从 T+1 分钟开始的间隔平均值小于从 T

你可以这样完成:

PCollection<KV<String, Double>> metricValues = ...;
// Collection of (metric, timestamped 5-minute average)
// windowed into the same 5-minute windows as the input,
// where timestamp is assigned as the beginning of the window.
PCollection<KV<String, TimestampedValue<Double>>>
  metricSlidingAverages = metricValues
    .apply(Window.<KV<String, Double>>into(
        SlidingWindows.of(Duration.standardMinutes(5))
                      .every(Duration.standardMinutes(1))))
    .apply(Mean.<String, Double>perKey())
    .apply(ParDo.of(new ReifyWindowFn()));

// Rewindow the previous collection into global window so we can
// do cross-window comparisons.
// For each metric, an unsorted list of (timestamp, average) pairs.
PCollection<KV<String, Iterable<TimestampedValue<Double>>>
  metricAverageSequences = metricSlidingAverages
    .apply(Window.<KV<String, TimestampedValue<Double>>>into(
        new GlobalWindows()))
    // We need to group the data by key again since the grouping key
    // has changed (remember, GBK implicitly groups by key and window)
    .apply(GroupByKey.<String, TimestampedValue<Double>>create())

metricAverageSequences.apply(new DetectAnomaliesFn());

...

class ReifyWindowFn extends DoFn<
    KV<String, Double>, KV<String, TimestampedValue<Double>>> 
  @ProcessElement
  public void process(ProcessContext c, BoundedWindow w) 
    // This DoFn makes the implicit window of the element be explicit
    // and extracts the starting timestamp of the window.
    c.output(KV.of(
      c.element().getKey(),
      TimestampedValue.of(c.element.getValue(), w.minTimestamp())));
  


class DetectAnomaliesFn extends DoFn<
    KV<String, Iterable<TimestampedValue<Double>>>, Void> 
  @ProcessElement
  public void process(ProcessContext c) 
    String metricId = c.element().getKey();
    // Sort the (timestamp, average) pairs by timestamp.
    List<TimestampedValue<Double>> averages = Ordering.natural()
        .onResultOf(TimestampedValue::getTimestamp)
        .sortedCopy(c.element().getValue());
    // Scan for anomalies.
    for (int i = 1; i < averages.size(); ++i) 
      if (averages.get(i).getValue() < averages.get(i-1).getValue()) 
        // Detected anomaly! Could do something with it,
        // e.g. publish to a third-party system or emit into
        // a PCollection.
      
    
  

请注意,我没有测试此代码,但它应该为您完成任务提供足够的概念指导。

【讨论】:

首先,非常感谢!其次,考虑到在流数据上运行,这种解决方案不会很快爆炸吗?我们将所有数据推入全局窗口,不是吗?无论如何,我会玩它,看看它如何适合我的需求。 10 倍! 特定键/窗口的数据不会永远缓冲 - 在默认累积模式下(丢弃触发的窗格),只要将 DetectAnomaliesFn 应用于此特定键,数据就会被 GC /值对。 讨论 cmets 不是很好的方法,所以我将在下面发送更新。

以上是关于在 DataFlow 中获取以前的窗口数据的主要内容,如果未能解决你的问题,请参考以下文章

数据流 - 对 BigQuery 的窗口写入?

使用 Dataflow 在 BigQuery 表之间进行流式更新

在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名

Dataflow 大型侧输入中的 Apache Beam

通过谷歌云功能在 DataFlow 作业中的 GCS .csv

Magento - 使用 cronjob 将产品(使用 Dataflow)导出到 CSV