Apache Beam 中的窗口数据每小时(顺时针)基础

Posted

技术标签:

【中文标题】Apache Beam 中的窗口数据每小时(顺时针)基础【英文标题】:window Data hourly(clockwise) basis in Apache beam 【发布时间】:2018-11-15 19:22:30 【问题描述】:

我正在尝试在 DataFlow/Apache Beam 作业中汇总每小时(如 12:00 到 12:59 和 01:00 到 01:59)的流数据。

以下是我的用例

数据从 pubsub 流式传输,它有一个时间戳(订单日期)。我想计算每小时收到的订单数量,我还想允许延迟 5 小时。以下是我正在使用的示例代码

    LOG.info("Start Running Pipeline");
    DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);

    Pipeline pipeline = Pipeline.create(options);
    PCollection<String>  directShipmentFeedData = pipeline.apply("Get Direct Shipment Feed Data", PubsubIO.readStrings().fromSubscription(directShipmentFeedSubscription));
    PCollection<String>  tibcoRetailOrderConfirmationFeedData = pipeline.apply("Get Tibco Retail Order Confirmation Feed Data", PubsubIO.readStrings().fromSubscription(tibcoRetailOrderConfirmationFeedSubscription));

    PCollection<String> flattenData = PCollectionList.of(directShipmentFeedData).and(tibcoRetailOrderConfirmationFeedData)
            .apply("Flatten Data from PubSub", Flatten.<String>pCollections());

    flattenData
        .apply(ParDo.of(new DataParse())).setCoder(SerializableCoder.of(SalesAndUnits.class))

        // Adding Window

        .apply(
                Window.<SalesAndUnits>into(
                            SlidingWindows.of(Duration.standardMinutes(15))
                            .every(Duration.standardMinutes(1)))
                            )

        // Data Enrich with Dimensions
        .apply(ParDo.of(new DataEnrichWithDimentions()))

        // Group And Hourly Sum
        .apply(new GroupAndSumSales())

        .apply(ParDo.of(new SQLWrite())).setCoder(SerializableCoder.of(SalesAndUnits.class));
    pipeline.run();
    LOG.info("Finish Running Pipeline");

【问题讨论】:

【参考方案1】:

我会使用符合您要求的窗口。类似于

Window.into(
  FixedWindows.of(Duration.standardHours(1))
).withAllowedLateness(Duration.standardHours(5)))

可能后跟count,因为我理解你需要这个。

希望对你有帮助

【讨论】:

以上是关于Apache Beam 中的窗口数据每小时(顺时针)基础的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam Python SDK 会丢弃延迟数据,还是无法配置延迟参数?

apache beam入门之 窗口水位线和超时数据概念

Apache Beam FixedWindows 之间的延迟

顺时针打印矩阵

apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python

顺时针打印矩阵