Apache Beam 中的窗口函数
Posted
技术标签:
【中文标题】Apache Beam 中的窗口函数【英文标题】:Window Functions in Apache Beam 【发布时间】:2021-12-22 13:30:11 【问题描述】:有人知道如何在 apache Beam(数据流)中执行窗口功能吗?
示例: Ex
ID Sector Country Income
1 Liam US 16133
2 Noah BR 10184
3 Oliver ITA 11119
4 Elijah FRA 13256
5 William GER 7722
6 James AUS 9786
7 Benjamin ARG 1451
8 Lucas FRA 4541
9 Henry US 9111
10 Alexander ITA 13002
11 Olivia ENG 5143
12 Emma US 18076
13 Ava MEX 15930
14 Charlotte ENG 18247
15 Sophia BR 9578
16 Amelia FRA 10813
17 Isabella FRA 7575
18 Mia GER 14875
19 Evelyn AUS 19749
20 Harper ITA 19642
问题:
-
如何使用 ID 排序的收入的运行总和创建另一列?
如何使用收入最高的人的排名创建另一个列
谢谢你 布鲁诺
【问题讨论】:
【参考方案1】:考虑以下方法。我已尽力确保 Pado fns 具有关联性和可交换性。这意味着当在多个工作人员上并行运行时,这应该不会中断。如果你发现这个突破了DataflowRunner
,请告诉我import apache_beam as beam
from apache_beam.transforms.core import DoFn
class cum_sum(DoFn):
def process(self, element,lkp_data,accum_sum):
for lkp_id_income in lkp_data:
if element['ID'] >= lkp_id_income[0]:
accum_sum += lkp_id_income[1]
element.update('cumulative_sum':accum_sum)
yield element
class rank_it(DoFn):
def process(self, element, lkp_data,counter):
for lkp_id_cumsum in lkp_data:
if lkp_id_cumsum['cumulative_sum'] < element['cumulative_sum']:
counter += 1
element.update('rank':counter)
yield element
with beam.Pipeline() as p:
data = (
p
| 'create'>>beam.Create(
[
'ID':4,
'Sector':'Liam',
'Country':'US',
'Income':1400
,
'ID':2,
'Sector':'piam',
'Country':'IS',
'Income':1200
,
'ID':1,
'Sector':'Oiam',
'Country':'PS',
'Income':1300
,
'ID':3,
'Sector':'Uiam',
'Country':'OS',
'Income':1800
]
)
)
ids_income = (
data
| 'get_ids_income'>>beam.Map(lambda element: (element['ID'], element['Income']))
)
with_cumulative_sum = (
data
| 'cumulative_sum'>>beam.ParDo(cum_sum(),lkp_data = beam.pvalue.AsIter(ids_income),accum_sum = 0)
)
with_ranking =(
with_cumulative_sum
| 'ranking'>>beam.ParDo(rank_it(),lkp_data = beam.pvalue.AsIter(with_cumulative_sum),counter = 1)
| 'print'>>beam.Map(print)
)
输出
'ID': 4, 'Sector': 'Liam', 'Country': 'US', 'Income': 1400, 'cumulative_sum': 5700, 'rank': 4
'ID': 2, 'Sector': 'piam', 'Country': 'IS', 'Income': 1200, 'cumulative_sum': 2500, 'rank': 2
'ID': 1, 'Sector': 'Oiam', 'Country': 'PS', 'Income': 1300, 'cumulative_sum': 1300, 'rank': 1
'ID': 3, 'Sector': 'Uiam', 'Country': 'OS', 'Income': 1800, 'cumulative_sum': 4300, 'rank': 3
【讨论】:
【参考方案2】:在Apache Beam subdivide your unbounded PCollection 中以较小的有界块进行窗口化以应用一些计算(分组依据、总和、平均......)。
Unbounded PCollection 来自流式处理,窗口基于时间戳(例如,您可以创建 5 分钟的滑动窗口)。在您的示例中,您没有时间戳,听起来像是有界 PCollection(批处理)。
从技术上讲,您可以通过预处理元素并添加虚拟时间指示器来模拟时间戳。但在您的情况下,一个简单的 groupby 或 sort 就足以实现您想要的。
【讨论】:
谢谢你,纪尧姆! 您还可以查看 Top 函数(在 Java 和 Python 中都可用),这可能会有所帮助。以上是关于Apache Beam 中的窗口函数的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam FixedWindows 之间的延迟
如何在 python apache Beam 的窗口中订购元素?