通过谷歌云功能在 DataFlow 作业中的 GCS .csv

Posted

技术标签:

【中文标题】通过谷歌云功能在 DataFlow 作业中的 GCS .csv【英文标题】:GCS .csv in DataFlow job through a google cloud function 【发布时间】:2022-01-08 15:08:06 【问题描述】:

我正在按照this 指南创建谷歌云函数,该函数在 GCS 存储桶触发期间启动 DataFlow 作业。我的问题是关于模板和 inout 文件的。我将在我的数据流管道中使用这部分来通过TextIO.read 获取源数据(GCS csv),但我不确定如何格式化这部分管道以考虑来自存储桶触发器的文件。我会有类似"ReadTable" >> TextIO.read().metadata 的东西吗?

p = beam.Pipeline(options=options)
raw_values = (
            p 
            | "ReadTable" >> TextIO.read().from("gs://bucket/file.csv")
            | "custFunc" >> beam.Map(CallAPI)
            | "writeTable" >> WriteToBigQuery('newtablw', project='project1', 
                                               dataset='test', schema=table_schema,
                                               write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
            )

【问题讨论】:

【参考方案1】:

使用 Dataflow 模板,您可以在运行模板时提供create runtime parameters。

定义所需的模板,例如 file_in 和 file_out。然后,当您的 Cloud Functions 被 GCS 事件触发时,您可以获取 event data to extract the bucket and the file name,将它们连接起来并作为 file_in 数据流参数提供。

【讨论】:

以上是关于通过谷歌云功能在 DataFlow 作业中的 GCS .csv的主要内容,如果未能解决你的问题,请参考以下文章

谷歌云 pubsub node.js 客户端与谷歌云功能不兼容

我们可以从 Google Cloud Dataflow 访问 gsutil 吗?如果是,那么有人可以举例说明吗?

谷歌云构建python apache Beam数据流yaml文件

谷歌云平台提交训练作业,如何从训练代码中读取 USER_ARGS?

执行谷歌数据流作业时出现 HTTP 传输错误

通过 iOS 应用触发谷歌云功能的最佳方式