Python 上的 Apache Beam 将 beam.Map 调用相乘

Posted

技术标签:

【中文标题】Python 上的 Apache Beam 将 beam.Map 调用相乘【英文标题】:Apache Beam on Python multiplies beam.Map calls 【发布时间】:2020-03-18 17:07:09 【问题描述】:

我正在使用 DataFlow 开发一个管道,它必须执行以下操作:

从 BigQuery 中提取最后一个详细的项目(来自 2 个不同的路径) 对于每个路径,通过 SFTP 获取新项目并将其保存到本地文件系统中 将文件上传到 Google 云存储

我必须在本地获取文件,因为管道不在 DataFlow 集群上运行(它不是最终代码...)。

从 BigQuery 中选择我得到两条记录:

我使用这 2 个输出作为 SFTP 函数的输入(仅下载路径 1951 的 1 个文件,“1951_2019112215.log.gz”,路径 1952 没有文件),然后我返回一个名称为字典路径和下载的文件:

'1951': ['1951_2019112215.log.gz']
'1952': []

现在我调用将它们上传到 GC 存储桶的函数,我希望它被调用两次,每次输入一次......但它被调用 8 次(每个输入 4 次),与下载文件的数量无关。

您能解释一下我为什么缺少什么以及缺少什么吗? 这是管道:

(p
    | 'Read Configuration Table ' >> beam.io.Read(beam.io.BigQuerySource(config['ENVIRONMENT']['configuration_table'])) #output 2 records from BQ
    | 'Get Files from Server' >> beam.Map(import_file) #Download files from SFTP and returns the 2 dictionaries above, 1 per call
    | 'Upload files on Bucket' >> beam.Map(upload_file_on_bucket) #it is called 4 time per input
 )

def import_file(element):
    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None
    srv = pysftp.Connection(host=config['SFTP']['host'],
                            username=config['SFTP']['username'],
                            private_key=os.path.join('config', config['SFTP']['private_key']),
                            cnopts=cnopts)

    list_downloaded_files = []

    last = element['last_file']
    file_type = element['folder']

    if file_type == '1951':
        folder = config['SFTP']['folder_1951']
    else:
        folder = config['SFTP']['folder_1952']

    file_list = srv.listdir(folder)

    if len(file_list) > 0:
        file_list = file_list[file_list.index(last)+1:]

    for file in file_list:
        srv.get(remotepath=folder+'/'+str(file), # da cambiare quando gira sul cluster
                #remotepath=os.path.join(folder, str(file)),
                localpath=os.path.join('download', file))
        list_downloaded_files.append(str(file))

    return file_type: list_downloaded_files

def upload_file_on_bucket(list_of_files):
    print('chiamata')
    print(list_of_files)
    if '1951' in list_of_files:
        file_type = '1951'
        list_of_files = list_of_files['1951']
    else:
        file_type = '1952'
        list_of_files = list_of_files['1952']

    client = storage.Client(project='MYPROJECT')
    bucket = client.get_bucket('MYBUCKET')

    if len(list_of_files) > 0:
        for file in list_of_files:
            blob = bucket.blob('MYPATH' + file)
            blob.upload_from_filename(os.path.join('download', file))
            #os.remove(os.path.join('download', file))

    list_of_files.sort()
    print(file_type: list_of_files[-1])
    return file_type: list_of_files[-1]

【问题讨论】:

能否提供import_fileupload_file_on_bucket函数的实现? 如果您在import_file 函数中使用return element,就会发生这种情况。如果是这样,请改用return [element]yield element。 Example 和 test. 感谢两位,抱歉回复晚了! @manesioz 我刚刚用我的代码修改了问题! @GuillemXercavins 正确,我使用返回!我会试试你的例子,我会回复你的! 很高兴听到,如果有帮助,我可以将其添加为答案。关于特定于 Beam 的示例。你可以参考这个测试:gist.github.com/gxercavins/933649b217ab29660502a105ddc8e892,其中,当我们使用return element 作为输出时,下一个 ParDo 接收到两个条目而不是一个(这就是我怀疑这可能是原因的原因)。当然,这是一个更通用的 Python 概念,您可以找到有关该主题的资源,例如:dataquest.io/blog/python-generators-tutorial 【参考方案1】:

如果我们从前一个函数返回一个字典而不是列表或生成器,则函数调用可能会重复。为此,我们应该使用return [element]yield element 而不是return element。使用以下数据展示这一点的示例:

data = ['team': 'red', 'score': 10,
            'team': 'blue', 'score': 8]

我们背靠背调用相同的ParDo

events = (p
  | 'Create Events' >> beam.Create(data) \
  | 'Log results 1' >> beam.ParDo(LogResults())
  | 'Log results 2' >> beam.ParDo(LogResults()))

我们每次只取消注释其中一条收益/收益行:

class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self, element):
    logging.info("Event: %s", element)

    # yield element  # option 1
    # return element  # option 2
    # return [element]  # option 3

我们比较所有三个选项的日志输出:

yield element
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: 'score': 8, 'team': 'blue'
INFO:root:Event: 'score': 8, 'team': 'blue'

return element
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: score
INFO:root:Event: team
INFO:root:Event: 'score': 8, 'team': 'blue'
INFO:root:Event: score
INFO:root:Event: team

return [element]
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: 'score': 8, 'team': 'blue'
INFO:root:Event: 'score': 8, 'team': 'blue'

当我们使用return element 时,LogResults 将被调用两次(每个字典键一次)。

【讨论】:

以上是关于Python 上的 Apache Beam 将 beam.Map 调用相乘的主要内容,如果未能解决你的问题,请参考以下文章

使用 Python 处理 Apache Beam 管道中的异常

Apache Beam,BigQueryIO.WriteTableRows() 上的 NoSuchMethodError?

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

如何使用 Apache Beam Python 将输出写入动态路径

使用Apache-beam在Python中删除字典中的第一项[重复]

Python Apache Beam 侧输入断言错误