数据流管道 - 使用动态参数或查询
Posted
技术标签:
【中文标题】数据流管道 - 使用动态参数或查询【英文标题】:Dataflow Pipeline - Using dynamic param or query 【发布时间】:2021-02-08 06:49:34 【问题描述】:我正在尝试创建一个数据流管道模板,这需要我从 bigquery 中读取数据。所以我需要使用Instant.now()
使我的查询动态化,但似乎在创建模板时查询被锁定
Some Code HERE
Some Code HERE
Some Code HERE
pipeline.apply("ReadFromBigQuery",
BigQueryIO.read(new DataTransformer(MyCustomObject.getQuery()))
.fromQuery(spec.getQuery())
.usingStandardSql()
.withQueryLocation("US")
.withoutValidation()
).apply("do Something 1",
Combine.globally(new CombineIterableAccumulatorFn<MyCustomObject2>())
).apply("do Something 2",
ParDo.of(new SendToKenshoo(param, param2)
);
我的查询是这样的
SELECT * FROM `my-project-id.my-dataset.my-view` where PARTITIONTIME between TIMESTAMP('@currentDate') and TIMESTAMP('@tomorrowDate')
需要使用 Instant.now()
或任何时间函数替换 @currentDate 和 @tomorrowDate
请举个例子
注意:我需要更改代码上的日期,而不是像这样在查询级别上更改日期
SELECT * FROM `my-project-id.my-dataset.my-view` where PARTITIONTIME between DATE_ADD(CURRENT_DATE(), INTERVAL -1 DAY) and CURRENT_DATE()
【问题讨论】:
【参考方案1】:我不确定您如何将这些参数发送到查询(通过值提供程序等)。但是,我不建议为此使用模板,因为您需要动态输入。如果你想这样做,我会使用 Flex 模板:https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
【讨论】:
以上是关于数据流管道 - 使用动态参数或查询的主要内容,如果未能解决你的问题,请参考以下文章
将日期时间参数从管道传递到数据流源存储过程 Azure 数据工厂
Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub