如何组合两个结果并将其传递到 apache-beam 管道中的下一步
Posted
技术标签:
【中文标题】如何组合两个结果并将其传递到 apache-beam 管道中的下一步【英文标题】:How to combine two results and pipe it to next step in apache-beam pipeline 【发布时间】:2020-11-12 22:35:09 【问题描述】:见下面代码sn-p,
我希望["metric1", "metric2"]
成为我对 RunTask.process 的输入。但是它分别用“metric1”和“metric2”运行了两次
def run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
root = p | 'Get source' >> beam.Create([
"source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
])
metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]
metric3 = (metric1, metric2) | beam.Flatten() | beam.ParDo(RunTask()) # I want ["metric1", "metric2"] to be my input for RunTask.process. However it was run twice with "metric1" and "metric2" respectively
【问题讨论】:
【参考方案1】:我了解您希望以遵循以下语法的方式加入两个 PCollection:['element1','element2']。为了实现这一点,您可以使用CoGroupByKey() 而不是Flatten()。
考虑到您的代码 sn-p,语法将:
def run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
root = p | 'Get source' >> beam.Create([
"source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
])
metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]
metric3 = (
(metric1, metric2)
| beam.CoGroupByKey()
| beam.ParDo(RunTask())
)
我想指出 Flatten() 和 CoGroupByKey() 之间的区别。
1) Flatten()接收两个或多个存储相同数据类型的PCollection,并将它们合并为一个逻辑PCollection。例如,
import apache_beam as beam
from apache_beam import Flatten, Create, ParDo, Map
p = beam.Pipeline()
adress_list = [
('leo', 'George St. 32'),
('ralph', 'Pyrmont St. 30'),
('mary', '10th Av.'),
('carly', 'Marina Bay 1'),
]
city_list = [
('leo', 'Sydney'),
('ralph', 'Sydney'),
('mary', 'NYC'),
('carly', 'Brisbane'),
]
street = p | 'CreateEmails' >> beam.Create(adress_list)
city = p | 'CreatePhones' >> beam.Create(city_list)
resul =(
(street,city)
|beam.Flatten()
|ParDo(print)
)
p.run()
还有输出,
('leo', 'George St. 32')
('ralph', 'Pyrmont St. 30')
('mary', '10th Av.')
('carly', 'Marina Bay 1')
('leo', 'Sydney')
('ralph', 'Sydney')
('mary', 'NYC')
('carly', 'Brisbane')
请注意,两个 PCollection 都在输出中。但是,一个附加到另一个。
2) CoGroupByKey() 执行两个或多个具有相同键类型的键值 PCollection 之间的关系连接。使用此方法,您将通过键执行连接,而不是像 Flatten() 中所做的那样追加。下面是一个例子,
import apache_beam as beam
from apache_beam import Flatten, Create, ParDo, Map
p = beam.Pipeline()
address_list = [
('leo', 'George St. 32'),
('ralph', 'Pyrmont St. 30'),
('mary', '10th Av.'),
('carly', 'Marina Bay 1'),
]
city_list = [
('leo', 'Sydney'),
('ralph', 'Sydney'),
('mary', 'NYC'),
('carly', 'Brisbane'),
]
street = p | 'CreateEmails' >> beam.Create(address_list)
city = p | 'CreatePhones' >> beam.Create(city_list)
results = (
(street, city)
| beam.CoGroupByKey()
|ParDo(print)
#| beam.io.WriteToText('delete.txt')
)
p.run()
还有输出,
('leo', (['George St. 32'], ['Sydney']))
('ralph', (['Pyrmont St. 30'], ['Sydney']))
('mary', (['10th Av.'], ['NYC']))
('carly', (['Marina Bay 1'], ['Brisbane']))
请注意,您需要一个 主键 才能加入结果。此外,此输出是您所期望的。
【讨论】:
【参考方案2】:或者,使用侧面输入:
metrics3 = metric1 | beam.ParDo(RunTask(), metric2=beam.pvalue.AsIter(metric2))
在 RunTask 进程()中:
def process(self, element_from_metric1, metric2):
...
【讨论】:
以上是关于如何组合两个结果并将其传递到 apache-beam 管道中的下一步的主要内容,如果未能解决你的问题,请参考以下文章
如何在json android中返回多行并将其传递给sqlite