通过谷歌云功能在 DataFlow 作业中的 GCS .csv
Posted
技术标签:
【中文标题】通过谷歌云功能在 DataFlow 作业中的 GCS .csv【英文标题】:GCS .csv in DataFlow job through a google cloud function 【发布时间】:2022-01-08 15:08:06 【问题描述】:我正在按照this 指南创建谷歌云函数,该函数在 GCS 存储桶触发期间启动 DataFlow 作业。我的问题是关于模板和 inout 文件的。我将在我的数据流管道中使用这部分来通过TextIO.read
获取源数据(GCS csv),但我不确定如何格式化这部分管道以考虑来自存储桶触发器的文件。我会有类似"ReadTable" >> TextIO.read().metadata
的东西吗?
p = beam.Pipeline(options=options)
raw_values = (
p
| "ReadTable" >> TextIO.read().from("gs://bucket/file.csv")
| "custFunc" >> beam.Map(CallAPI)
| "writeTable" >> WriteToBigQuery('newtablw', project='project1',
dataset='test', schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
【问题讨论】:
【参考方案1】:使用 Dataflow 模板,您可以在运行模板时提供create runtime parameters。
定义所需的模板,例如 file_in 和 file_out。然后,当您的 Cloud Functions 被 GCS 事件触发时,您可以获取 event data to extract the bucket and the file name,将它们连接起来并作为 file_in 数据流参数提供。
【讨论】:
以上是关于通过谷歌云功能在 DataFlow 作业中的 GCS .csv的主要内容,如果未能解决你的问题,请参考以下文章
谷歌云 pubsub node.js 客户端与谷歌云功能不兼容
我们可以从 Google Cloud Dataflow 访问 gsutil 吗?如果是,那么有人可以举例说明吗?
谷歌云构建python apache Beam数据流yaml文件