Beam:使用窗口边界写入每个窗口元素计数

Posted

技术标签:

【中文标题】Beam:使用窗口边界写入每个窗口元素计数【英文标题】:Beam: writing per window element count with window boundaries 【发布时间】:2018-11-26 03:40:07 【问题描述】:

对于一个简单的概念验证,我试图在两分钟的窗口中显示点击数据。我想做的就是打印每个窗口的计数,以及 BigQuery 的窗口边界。在运行我的管道时,我不断收到以下错误:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: java.io.IOException: Insert failed: ["errors":["debugInfo":"","location":"windowend","message":"This field is not a record.","reason":"invalid"],"index":0]

管道如下所示:

// Creating the pipeline
Pipeline p = Pipeline.create(options);

// Window items
PCollection<TableRow> counts = p.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("AddEventTimestamps", WithTimestamps.of(TotalCountPipeline::ExtractTimeStamp).withAllowedTimestampSkew(Duration.standardDays(10000)))
        .apply("Window", Window.<String>into(
                FixedWindows.of(Duration.standardHours(options.getWindowSize())))
                .triggering(
                        AfterWatermark.pastEndOfWindow()
                                .withLateFirings(AfterPane.elementCountAtLeast(1)))
                .withAllowedLateness(Duration.standardDays(10000))
                .accumulatingFiredPanes())
        .apply("CalculateSum", Combine.globally(Count.<String>combineFn()).withoutDefaults())
        .apply("BigQueryFormat", ParDo.of(new FormatCountsFn()));

// Writing to BigQuery
counts.apply("WriteToBigQuery",BigQueryIO.writeTableRows()
                .to(options.getOutputTable())
                .withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

// Execute pipeline
p.run().waitUntilFinish();

我猜是和BigQuery格式化函数有关,实现如下:

static class FormatCountsFn extends DoFn<Long, TableRow> 
    @ProcessElement
    public void processElement(ProcessContext c, BoundedWindow window) 
        TableRow row =
                new TableRow()
                        .set("windowStart", window.maxTimestamp().toDateTime())
                        .set("count", c.element().intValue());
        c.output(row);
    

受this post 启发。任何人都可以对此有所了解吗?我似乎无法理解它。

【问题讨论】:

【参考方案1】:

显然,这个问题的答案与光束窗口无关,仅与 BigQuery 有关。将 DateTime 对象写入 BigQuery 行需要一个正确的 yyyy-MM-dd HH:mm:ss 格式的字符串,这与我提供的 DateTime 对象相反。

【讨论】:

以上是关于Beam:使用窗口边界写入每个窗口元素计数的主要内容,如果未能解决你的问题,请参考以下文章

基于窗口和元素计数从数据流写入 GCS

根据窗口边界打开 gwt 子菜单方向

将一个窗口(无边界)放在另一个窗口(无边界)的顶部(z-index)上,这样顶部的窗口就会跟随底部的窗口

Java,获取当前屏幕的窗口边界

如何获得最小化窗口的恢复边界?

将无边界的摆动窗口带到前面