如何组合两个结果并将其传递到 apache-beam 管道中的下一步

Posted

技术标签:

【中文标题】如何组合两个结果并将其传递到 apache-beam 管道中的下一步【英文标题】:How to combine two results and pipe it to next step in apache-beam pipeline 【发布时间】:2020-11-12 22:35:09 【问题描述】:

见下面代码sn-p, 我希望["metric1", "metric2"] 成为我对 RunTask.process 的输入。但是它分别用“metric1”和“metric2”运行了两次

def run():
  
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  p = beam.Pipeline(options=pipeline_options)

  root = p | 'Get source' >> beam.Create([
      "source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
  ])

  metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
  metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]

  metric3 = (metric1, metric2) | beam.Flatten() | beam.ParDo(RunTask()) # I want ["metric1", "metric2"] to be my input for RunTask.process. However it was run twice with "metric1" and "metric2" respectively

  

【问题讨论】:

【参考方案1】:

我了解您希望以遵循以下语法的方式加入两个 PCollection:['element1','element2']。为了实现这一点,您可以使用CoGroupByKey() 而不是Flatten()。

考虑到您的代码 sn-p,语法将:

def run():
  
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  p = beam.Pipeline(options=pipeline_options)

  root = p | 'Get source' >> beam.Create([
      "source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
  ])

  metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
  metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]

  metric3 = (
       (metric1, metric2) 
       | beam.CoGroupByKey() 
       | beam.ParDo(RunTask()) 
 )

我想指出 Flatten() 和 CoGroupByKey() 之间的区别。

1) Flatten()接收两个或多个存储相同数据类型的PCollection,并将它们合并为一个逻辑PCollection。例如,

import apache_beam as beam

from apache_beam import Flatten, Create, ParDo, Map

p = beam.Pipeline()

adress_list = [
    ('leo', 'George St. 32'),
    ('ralph', 'Pyrmont St. 30'),
    ('mary', '10th Av.'),
    ('carly', 'Marina Bay 1'),
]
city_list = [
    ('leo', 'Sydney'),
    ('ralph', 'Sydney'),
    ('mary', 'NYC'),
    ('carly', 'Brisbane'),
]

street = p | 'CreateEmails' >> beam.Create(adress_list)
city = p | 'CreatePhones' >> beam.Create(city_list)

resul =(
    (street,city)
    |beam.Flatten()
    |ParDo(print)
)

p.run()

还有输出,

('leo', 'George St. 32')
('ralph', 'Pyrmont St. 30')
('mary', '10th Av.')
('carly', 'Marina Bay 1')
('leo', 'Sydney')
('ralph', 'Sydney')
('mary', 'NYC')
('carly', 'Brisbane')

请注意,两个 PCollection 都在输出中。但是,一个附加到另一个。

2) CoGroupByKey() 执行两个或多个具有相同键类型的键值 PCollection 之间的关系连接。使用此方法,您将通过键执行连接,而不是像 Flatten() 中所做的那样追加。下面是一个例子,

import apache_beam as beam

from apache_beam import Flatten, Create, ParDo, Map

p = beam.Pipeline()

address_list = [
    ('leo', 'George St. 32'),
    ('ralph', 'Pyrmont St. 30'),
    ('mary', '10th Av.'),
    ('carly', 'Marina Bay 1'),
]
city_list = [
    ('leo', 'Sydney'),
    ('ralph', 'Sydney'),
    ('mary', 'NYC'),
    ('carly', 'Brisbane'),
]

street = p | 'CreateEmails' >> beam.Create(address_list)
city = p | 'CreatePhones' >> beam.Create(city_list)

results = (
    (street, city)
    | beam.CoGroupByKey()
    |ParDo(print)
    #| beam.io.WriteToText('delete.txt')
    
)

p.run()

还有输出,

('leo', (['George St. 32'], ['Sydney']))
('ralph', (['Pyrmont St. 30'], ['Sydney']))
('mary', (['10th Av.'], ['NYC']))
('carly', (['Marina Bay 1'], ['Brisbane']))

请注意,您需要一个 主键 才能加入结果。此外,此输出是您所期望的。

【讨论】:

【参考方案2】:

或者,使用侧面输入:

metrics3 = metric1 | beam.ParDo(RunTask(), metric2=beam.pvalue.AsIter(metric2))

在 RunTask 进程()中:

def process(self, element_from_metric1, metric2):
  ...

【讨论】:

以上是关于如何组合两个结果并将其传递到 apache-beam 管道中的下一步的主要内容,如果未能解决你的问题,请参考以下文章

如何在 s-s-rS 中组合多个结果集?

如何在json android中返回多行并将其传递给sqlite

Firestore:如何从数组中检索数据并将其传递到表中?

拆分数据集并将子集并行传递给函数,然后重新组合结果

我如何将两个数据框列值作为键传递给2键到一个值字典,然后将结果传递到另一列?

如何将变量从 Activity 传递到 Fragment,并将其传回?