数据流管道 - 使用动态参数或查询

Posted

技术标签:

【中文标题】数据流管道 - 使用动态参数或查询【英文标题】:Dataflow Pipeline - Using dynamic param or query 【发布时间】:2021-02-08 06:49:34 【问题描述】:

我正在尝试创建一个数据流管道模板,这需要我从 bigquery 中读取数据。所以我需要使用Instant.now() 使我的查询动态化,但似乎在创建模板时查询被锁定

  Some Code HERE
  Some Code HERE
  Some Code HERE
 
  pipeline.apply("ReadFromBigQuery",
      BigQueryIO.read(new DataTransformer(MyCustomObject.getQuery()))
          .fromQuery(spec.getQuery())
          .usingStandardSql()
          .withQueryLocation("US")
          .withoutValidation()
  ).apply("do Something 1",
      Combine.globally(new CombineIterableAccumulatorFn<MyCustomObject2>())
  ).apply("do Something 2",
      ParDo.of(new SendToKenshoo(param, param2)
  );

我的查询是这样的

SELECT * FROM `my-project-id.my-dataset.my-view` where PARTITIONTIME between TIMESTAMP('@currentDate') and  TIMESTAMP('@tomorrowDate')

需要使用 Instant.now() 或任何时间函数替换 @currentDate 和 @tomorrowDate 请举个例子

注意:我需要更改代码上的日期,而不是像这样在查询级别上更改日期

SELECT * FROM `my-project-id.my-dataset.my-view` where PARTITIONTIME between DATE_ADD(CURRENT_DATE(), INTERVAL -1 DAY) and  CURRENT_DATE()

【问题讨论】:

【参考方案1】:

我不确定您如何将这些参数发送到查询(通过值提供程序等)。但是,我不建议为此使用模板,因为您需要动态输入。如果你想这样做,我会使用 Flex 模板:https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates

【讨论】:

以上是关于数据流管道 - 使用动态参数或查询的主要内容,如果未能解决你的问题,请参考以下文章

将日期时间参数从管道传递到数据流源存储过程 Azure 数据工厂

使用查询作为输入参数创建动态SSRS报告

关于oracle中table函数的使用

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub

power查询自定义动态日期参数

Oracle中Table函数简单应用实例