如何在 Google Dataflow 中添加列以查询结果
Posted
技术标签:
【中文标题】如何在 Google Dataflow 中添加列以查询结果【英文标题】:How to add a column to query results in Google Dataflow 【发布时间】:2021-01-08 15:08:53 【问题描述】:我正在尝试从 BigQuery 读取查询,然后使用 Kotlin 中的 Apache Beam / Dataflow 添加一列,其中包含当前日期作为时间戳。我不想在查询本身内部执行此操作,因为我想将此代码重用于大量查询,而且它看起来是一个更好的设计。
这是我写的管道代码:
val pipeline = Pipeline.create(options)
.apply("Retrieve query", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
.apply("Add date", ParDo.of(AddDate()))
.apply("Store data", BigQueryIO.writeTableRows().withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.to(TableReference().setProjectId(gcpProject).setDatasetId(datasetId).setTableId(tableId))
由于某种原因,它没有从Add date
转换中前进。
这是最有可能出现错误/错误的代码:
class AddDate : DoFn<TableRow, TableRow>()
@ProcessElement
fun processElement(context: ProcessContext)
val tableRow = context.element() as TableRow
tableRow.set("process_date", Instant.now())
context.output(tableRow)
我也尝试在 processElement
中使用此代码,但仍然无法正常工作。
context.outputWithTimestamp(context.element(), Instant.now())
错误如下:
输入值不得以任何方式改变。
【问题讨论】:
【参考方案1】:通过使用新对象并注意用于日期的类型(对于DATE
或TIMESTAMP
类型)来解决问题
@ProcessElement
fun processElement(context: ProcessContext)
val tableRow = TableRow()
tableRow.set("process_date", Instant.now().toString())
val input = context.element() as TableRow
input.keys.forEach tableRow.set(it, input[it])
context.output(tableRow)
【讨论】:
以上是关于如何在 Google Dataflow 中添加列以查询结果的主要内容,如果未能解决你的问题,请参考以下文章
在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名
如何在Apache Beam / Google Dataflow中使用ParseJsons?
如何修复 Google DataFlow Pipeline (args) 空指针异常?
Google Dataflow - 如果写入本地服务器,如何在 java 中指定 TextIO?