如何在 Google Dataflow 中添加列以查询结果

Posted

技术标签:

【中文标题】如何在 Google Dataflow 中添加列以查询结果【英文标题】:How to add a column to query results in Google Dataflow 【发布时间】:2021-01-08 15:08:53 【问题描述】:

我正在尝试从 BigQuery 读取查询,然后使用 Kotlin 中的 Apache Beam / Dataflow 添加一列,其中包含当前日期作为时间戳。我不想在查询本身内部执行此操作,因为我想将此代码重用于大量查询,而且它看起来是一个更好的设计。

这是我写的管道代码:

val pipeline = Pipeline.create(options)
  .apply("Retrieve query", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
  .apply("Add date", ParDo.of(AddDate()))
  .apply("Store data", BigQueryIO.writeTableRows().withSchema(tableSchema)
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    .to(TableReference().setProjectId(gcpProject).setDatasetId(datasetId).setTableId(tableId))

由于某种原因,它没有从Add date 转换中前进。 这是最有可能出现错误/错误的代码:

class AddDate : DoFn<TableRow, TableRow>() 

    @ProcessElement
    fun processElement(context: ProcessContext) 
        val tableRow = context.element() as TableRow
        tableRow.set("process_date", Instant.now())
        context.output(tableRow)
    

我也尝试在 processElement 中使用此代码,但仍然无法正常工作。

context.outputWithTimestamp(context.element(), Instant.now())

错误如下:

输入值不得以任何方式改变。

【问题讨论】:

【参考方案1】:

通过使用新对象并注意用于日期的类型(对于DATETIMESTAMP 类型)来解决问题

@ProcessElement
fun processElement(context: ProcessContext) 
    val tableRow = TableRow()
    tableRow.set("process_date", Instant.now().toString())
    val input = context.element() as TableRow
    input.keys.forEach  tableRow.set(it, input[it]) 
    context.output(tableRow)

【讨论】:

以上是关于如何在 Google Dataflow 中添加列以查询结果的主要内容,如果未能解决你的问题,请参考以下文章

在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名

如何在Apache Beam / Google Dataflow中使用ParseJsons?

如何修复 Google DataFlow Pipeline (args) 空指针异常?

Google Dataflow - 如果写入本地服务器,如何在 java 中指定 TextIO?

如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?

如何强制 Hibernate 添加 ID 列以插入查询?