Apache Beam 中的窗口函数

Posted

技术标签:

【中文标题】Apache Beam 中的窗口函数【英文标题】:Window Functions in Apache Beam 【发布时间】:2021-12-22 13:30:11 【问题描述】:

有人知道如何在 apache Beam(数据流)中执行窗口功能吗?

示例: Ex

ID  Sector  Country Income
1   Liam    US  16133
2   Noah    BR  10184
3   Oliver  ITA 11119
4   Elijah  FRA 13256
5   William GER 7722
6   James   AUS 9786
7   Benjamin    ARG 1451
8   Lucas   FRA 4541
9   Henry   US  9111
10  Alexander   ITA 13002
11  Olivia  ENG 5143
12  Emma    US  18076
13  Ava MEX 15930
14  Charlotte   ENG 18247
15  Sophia  BR  9578
16  Amelia  FRA 10813
17  Isabella    FRA 7575
18  Mia GER 14875
19  Evelyn  AUS 19749
20  Harper  ITA 19642

问题:

    如何使用 ID 排序的收入的运行总和创建另一列? 如何使用收入最高的人的排名创建另一个列

谢谢你 布鲁诺

【问题讨论】:

【参考方案1】:

考虑以下方法。我已尽力确保 Pado fns 具有关联性和可交换性。这意味着当在多个工作人员上并行运行时,这应该不会中断。如果你发现这个突破了DataflowRunner

,请告诉我
import apache_beam as beam
from apache_beam.transforms.core import  DoFn


class cum_sum(DoFn):

    def process(self, element,lkp_data,accum_sum):
        
        for lkp_id_income in lkp_data:
            if element['ID'] >= lkp_id_income[0]:
                accum_sum += lkp_id_income[1]
        element.update('cumulative_sum':accum_sum)
        yield element
    
class rank_it(DoFn):

    def process(self, element, lkp_data,counter):
        
        for lkp_id_cumsum in lkp_data:
            if lkp_id_cumsum['cumulative_sum'] < element['cumulative_sum']:
                counter += 1
        element.update('rank':counter)
        yield element


with beam.Pipeline() as p:
    data = (
        p
        | 'create'>>beam.Create(
            [
               
              'ID':4,
              'Sector':'Liam',
              'Country':'US',
             'Income':1400
             ,
             
              'ID':2,
              'Sector':'piam',
              'Country':'IS',
             'Income':1200
             ,
             
              'ID':1,
              'Sector':'Oiam',
              'Country':'PS',
             'Income':1300
             ,
             
              'ID':3,
              'Sector':'Uiam',
              'Country':'OS',
             'Income':1800
             
           ]
       )
   )
   
    ids_income = (
       data
       | 'get_ids_income'>>beam.Map(lambda element: (element['ID'], element['Income']))
    )
    with_cumulative_sum = (
        data
        | 'cumulative_sum'>>beam.ParDo(cum_sum(),lkp_data = beam.pvalue.AsIter(ids_income),accum_sum = 0)
    )

    with_ranking =(
        with_cumulative_sum
        | 'ranking'>>beam.ParDo(rank_it(),lkp_data = beam.pvalue.AsIter(with_cumulative_sum),counter = 1)
        | 'print'>>beam.Map(print)

    )

输出

'ID': 4, 'Sector': 'Liam', 'Country': 'US', 'Income': 1400, 'cumulative_sum': 5700, 'rank': 4
'ID': 2, 'Sector': 'piam', 'Country': 'IS', 'Income': 1200, 'cumulative_sum': 2500, 'rank': 2
'ID': 1, 'Sector': 'Oiam', 'Country': 'PS', 'Income': 1300, 'cumulative_sum': 1300, 'rank': 1
'ID': 3, 'Sector': 'Uiam', 'Country': 'OS', 'Income': 1800, 'cumulative_sum': 4300, 'rank': 3

【讨论】:

【参考方案2】:

在Apache Beam subdivide your unbounded PCollection 中以较小的有界块进行窗口化以应用一些计算(分组依据、总和、平均......)。

Unbounded PCollection 来自流式处理,窗口基于时间戳(例如,您可以创建 5 分钟的滑动窗口)。在您的示例中,您没有时间戳,听起来像是有界 PCollection(批处理)。

从技术上讲,您可以通过预处理元素并添加虚拟时间指示器来模拟时间戳。但在您的情况下,一个简单的 groupby 或 sort 就足以实现您想要的。

【讨论】:

谢谢你,纪尧姆! 您还可以查看 Top 函数(在 Java 和 Python 中都可用),这可能会有所帮助。

以上是关于Apache Beam 中的窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

apache beam入门之 窗口水位线和超时数据概念

Apache Beam FixedWindows 之间的延迟

如何在 python apache Beam 的窗口中订购元素?

如何在 Apache Beam/Google 数据流中将大窗口缩减为小窗口?

Beam:使用窗口边界写入每个窗口元素计数

Apache Beam:如果前面有状态转换,WriteToBigQuery 将不起作用,除非应用重新窗口化