Python 上的 Apache Beam 将 beam.Map 调用相乘
Posted
技术标签:
【中文标题】Python 上的 Apache Beam 将 beam.Map 调用相乘【英文标题】:Apache Beam on Python multiplies beam.Map calls 【发布时间】:2020-03-18 17:07:09 【问题描述】:我正在使用 DataFlow 开发一个管道,它必须执行以下操作:
从 BigQuery 中提取最后一个详细的项目(来自 2 个不同的路径) 对于每个路径,通过 SFTP 获取新项目并将其保存到本地文件系统中 将文件上传到 Google 云存储我必须在本地获取文件,因为管道不在 DataFlow 集群上运行(它不是最终代码...)。
从 BigQuery 中选择我得到两条记录:
我使用这 2 个输出作为 SFTP 函数的输入(仅下载路径 1951 的 1 个文件,“1951_2019112215.log.gz”,路径 1952 没有文件),然后我返回一个名称为字典路径和下载的文件:
'1951': ['1951_2019112215.log.gz']
'1952': []
现在我调用将它们上传到 GC 存储桶的函数,我希望它被调用两次,每次输入一次......但它被调用 8 次(每个输入 4 次),与下载文件的数量无关。
您能解释一下我为什么缺少什么以及缺少什么吗? 这是管道:
(p
| 'Read Configuration Table ' >> beam.io.Read(beam.io.BigQuerySource(config['ENVIRONMENT']['configuration_table'])) #output 2 records from BQ
| 'Get Files from Server' >> beam.Map(import_file) #Download files from SFTP and returns the 2 dictionaries above, 1 per call
| 'Upload files on Bucket' >> beam.Map(upload_file_on_bucket) #it is called 4 time per input
)
与
def import_file(element):
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
srv = pysftp.Connection(host=config['SFTP']['host'],
username=config['SFTP']['username'],
private_key=os.path.join('config', config['SFTP']['private_key']),
cnopts=cnopts)
list_downloaded_files = []
last = element['last_file']
file_type = element['folder']
if file_type == '1951':
folder = config['SFTP']['folder_1951']
else:
folder = config['SFTP']['folder_1952']
file_list = srv.listdir(folder)
if len(file_list) > 0:
file_list = file_list[file_list.index(last)+1:]
for file in file_list:
srv.get(remotepath=folder+'/'+str(file), # da cambiare quando gira sul cluster
#remotepath=os.path.join(folder, str(file)),
localpath=os.path.join('download', file))
list_downloaded_files.append(str(file))
return file_type: list_downloaded_files
和
def upload_file_on_bucket(list_of_files):
print('chiamata')
print(list_of_files)
if '1951' in list_of_files:
file_type = '1951'
list_of_files = list_of_files['1951']
else:
file_type = '1952'
list_of_files = list_of_files['1952']
client = storage.Client(project='MYPROJECT')
bucket = client.get_bucket('MYBUCKET')
if len(list_of_files) > 0:
for file in list_of_files:
blob = bucket.blob('MYPATH' + file)
blob.upload_from_filename(os.path.join('download', file))
#os.remove(os.path.join('download', file))
list_of_files.sort()
print(file_type: list_of_files[-1])
return file_type: list_of_files[-1]
【问题讨论】:
能否提供import_file
和upload_file_on_bucket
函数的实现?
如果您在import_file
函数中使用return element
,就会发生这种情况。如果是这样,请改用return [element]
或yield element
。 Example 和 test.
感谢两位,抱歉回复晚了! @manesioz 我刚刚用我的代码修改了问题!
@GuillemXercavins 正确,我使用返回!我会试试你的例子,我会回复你的!
很高兴听到,如果有帮助,我可以将其添加为答案。关于特定于 Beam 的示例。你可以参考这个测试:gist.github.com/gxercavins/933649b217ab29660502a105ddc8e892,其中,当我们使用return element
作为输出时,下一个 ParDo 接收到两个条目而不是一个(这就是我怀疑这可能是原因的原因)。当然,这是一个更通用的 Python 概念,您可以找到有关该主题的资源,例如:dataquest.io/blog/python-generators-tutorial
【参考方案1】:
如果我们从前一个函数返回一个字典而不是列表或生成器,则函数调用可能会重复。为此,我们应该使用return [element]
或yield element
而不是return element
。使用以下数据展示这一点的示例:
data = ['team': 'red', 'score': 10,
'team': 'blue', 'score': 8]
我们背靠背调用相同的ParDo
:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Log results 1' >> beam.ParDo(LogResults())
| 'Log results 2' >> beam.ParDo(LogResults()))
我们每次只取消注释其中一条收益/收益行:
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Event: %s", element)
# yield element # option 1
# return element # option 2
# return [element] # option 3
我们比较所有三个选项的日志输出:
yield element
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: 'score': 8, 'team': 'blue'
INFO:root:Event: 'score': 8, 'team': 'blue'
return element
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: score
INFO:root:Event: team
INFO:root:Event: 'score': 8, 'team': 'blue'
INFO:root:Event: score
INFO:root:Event: team
return [element]
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: 'score': 10, 'team': 'red'
INFO:root:Event: 'score': 8, 'team': 'blue'
INFO:root:Event: 'score': 8, 'team': 'blue'
当我们使用return element
时,LogResults
将被调用两次(每个字典键一次)。
【讨论】:
以上是关于Python 上的 Apache Beam 将 beam.Map 调用相乘的主要内容,如果未能解决你的问题,请参考以下文章
使用 Python 处理 Apache Beam 管道中的异常
Apache Beam,BigQueryIO.WriteTableRows() 上的 NoSuchMethodError?
如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表
如何使用 Apache Beam Python 将输出写入动态路径