在 DataFlow 中获取以前的窗口数据
Posted
技术标签:
【中文标题】在 DataFlow 中获取以前的窗口数据【英文标题】:Getting previous window data in DataFlow 【发布时间】:2017-12-17 11:27:20 【问题描述】:试图创建一些警报系统机制,我希望找到两个窗口之间的平均值下降。
我很高兴找到TrafficRoutes 的例子,特别是当我看到它说:
如果滑动窗口中的绝大多数速度发生了“减速” 小于上一个窗口的读数。
我查看了code,但不明白为什么这意味着我们从前一个窗口中获取了前一个值。由于到目前为止我还没有使用滑动窗口的经验,所以我想我可能会遗漏一些东西。
实现这种机制,无论是否有滑动窗口 - 不会像我怀疑的那样从以前的窗口获取数据。
知道我想念什么吗? 是否有某种方法可以从上一个窗口中获取值?
我正在使用 SDK 1.9.0 在 GCP Dataflow 上执行。
请指教,
舒舒
【问题讨论】:
嗯,您帖子中的引用与代码中的文本不匹配 - 它说“如果此滑动窗口中的绝大多数速度低于之前的读数,我们称之为‘减速’” ,没有提到“上一个窗口”。该代码通过处理给定路线上所有站点的所有速度测量值的完整列表(按时间戳排序)来查看特定站点的先前速度测量值。 一般来说,“上一个窗口”的概念没有明确定义:例如如果一个键有 3 个窗口 [1, 3]、[2, 3] 和 [1, 2],哪个是 [2,3] 的“前一个”?此外,所有窗口都是同时处理的,不一定按任何特定顺序。您可以将窗口简单地视为聚合/分组操作(例如 GroupByKey 和 Combine)的另一个隐式分组键。 我认为仍然有一种合理的方法可以实现您的原始目标,即使 TrafficRoutes 不是模仿的最佳示例。让我把它写在答案中...... 谢谢。引用来自cloud.google.com/dataflow/examples/all-examples#traffic-routes 【参考方案1】:我的假设:
您的警报系统已将数据划分为由“指标 ID”标识的“指标”。 指标在给定时间的值为Double
。
您以PCollection<KV<String, Double>>
形式接收度量数据,其中String
是度量ID,Double
是度量值,并且每个元素都有适当的隐式时间戳(如果没有,您可以分配一个使用WithTimestamps
转换)。
您希望从每 1 分钟开始计算每个 5 分钟间隔的每个指标的滑动平均值,并希望在从 T+1 分钟开始的间隔平均值小于从 T
你可以这样完成:
PCollection<KV<String, Double>> metricValues = ...;
// Collection of (metric, timestamped 5-minute average)
// windowed into the same 5-minute windows as the input,
// where timestamp is assigned as the beginning of the window.
PCollection<KV<String, TimestampedValue<Double>>>
metricSlidingAverages = metricValues
.apply(Window.<KV<String, Double>>into(
SlidingWindows.of(Duration.standardMinutes(5))
.every(Duration.standardMinutes(1))))
.apply(Mean.<String, Double>perKey())
.apply(ParDo.of(new ReifyWindowFn()));
// Rewindow the previous collection into global window so we can
// do cross-window comparisons.
// For each metric, an unsorted list of (timestamp, average) pairs.
PCollection<KV<String, Iterable<TimestampedValue<Double>>>
metricAverageSequences = metricSlidingAverages
.apply(Window.<KV<String, TimestampedValue<Double>>>into(
new GlobalWindows()))
// We need to group the data by key again since the grouping key
// has changed (remember, GBK implicitly groups by key and window)
.apply(GroupByKey.<String, TimestampedValue<Double>>create())
metricAverageSequences.apply(new DetectAnomaliesFn());
...
class ReifyWindowFn extends DoFn<
KV<String, Double>, KV<String, TimestampedValue<Double>>>
@ProcessElement
public void process(ProcessContext c, BoundedWindow w)
// This DoFn makes the implicit window of the element be explicit
// and extracts the starting timestamp of the window.
c.output(KV.of(
c.element().getKey(),
TimestampedValue.of(c.element.getValue(), w.minTimestamp())));
class DetectAnomaliesFn extends DoFn<
KV<String, Iterable<TimestampedValue<Double>>>, Void>
@ProcessElement
public void process(ProcessContext c)
String metricId = c.element().getKey();
// Sort the (timestamp, average) pairs by timestamp.
List<TimestampedValue<Double>> averages = Ordering.natural()
.onResultOf(TimestampedValue::getTimestamp)
.sortedCopy(c.element().getValue());
// Scan for anomalies.
for (int i = 1; i < averages.size(); ++i)
if (averages.get(i).getValue() < averages.get(i-1).getValue())
// Detected anomaly! Could do something with it,
// e.g. publish to a third-party system or emit into
// a PCollection.
请注意,我没有测试此代码,但它应该为您完成任务提供足够的概念指导。
【讨论】:
首先,非常感谢!其次,考虑到在流数据上运行,这种解决方案不会很快爆炸吗?我们将所有数据推入全局窗口,不是吗?无论如何,我会玩它,看看它如何适合我的需求。 10 倍! 特定键/窗口的数据不会永远缓冲 - 在默认累积模式下(丢弃触发的窗格),只要将 DetectAnomaliesFn 应用于此特定键,数据就会被 GC /值对。 讨论 cmets 不是很好的方法,所以我将在下面发送更新。以上是关于在 DataFlow 中获取以前的窗口数据的主要内容,如果未能解决你的问题,请参考以下文章
使用 Dataflow 在 BigQuery 表之间进行流式更新
在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名