如何在谷歌云数据流中运行动态第二个查询?

Posted

技术标签:

【中文标题】如何在谷歌云数据流中运行动态第二个查询?【英文标题】:How to run dynamic second query in google cloud dataflow? 【发布时间】:2018-12-10 21:31:05 【问题描述】:

我正在尝试执行一项操作,其中我通过查询获取 Id 列表,将它们转换为用逗号分隔的字符串(即“1,2,3”),然后在辅助查询中使用它。尝试运行第二个查询时,出现语法错误:

"lambda 转换的目标类型必须是接口"

String query = "SELECT DISTINCT campaignId FROM `" + options.getEligibilityInputTable() + "` ";

    Pipeline p = Pipeline.create(options);
    p.apply("GetCampaignIds", BigQueryIO.readTableRows().withTemplateCompatibility().fromQuery(query).usingStandardSql())
      .apply("TransformCampaignIds",
        MapElements.into(TypeDescriptors.strings())
        .via((TableRow row) -> (String)row.get("campaignId")))
      .apply(Combine.globally(new StringToCsvCombineFn()))
      .apply("GetAllCampaigns", campaignIds -> BigQueryIO.readTableRows().withTemplateCompatibility().fromQuery("SELECT id AS campaignId, dataQuery FROM `projectid.mysql_standard.campaigns` WHERE campaignId IN (" + campaignIds + ")").usingStandardSql())
....

如何将查询链接在一起?

【问题讨论】:

【参考方案1】:

很遗憾,您无法对现有资源执行此操作。您的选择有两个:

您从 ParDo 手动调用 BQ API。 您编写了一个复杂的 SQL 查询来为您执行此操作。

第二个选项看起来像这样:

String query = "SELECT id AS campaignId, dataQuery \
               FROM `projectid.mysql_standard.campaigns` \
               WHERE campaignId IN ( \
                   SELECT DISTINCT campaignId \
                   FROM `" + options.getEligibilityInputTable() 
                   + "`)";

Pipeline p = Pipeline.create(options);
p.apply("GetAllCampaigns", BigQueryIO.readTableRows()
                                     .withTemplateCompatibility()
                                     .fromQuery(query)
                                     .usingStandardSql());

【讨论】:

以上是关于如何在谷歌云数据流中运行动态第二个查询?的主要内容,如果未能解决你的问题,请参考以下文章

Bigquery 如何使用存储在谷歌云中的数据?

如何以编程方式在谷歌云运行 api 中获取当前项目 ID

Python:在谷歌云数据存储模拟器中保存数据

如何在谷歌云存储中启用实时对象访问分析?

如何在谷歌云构建中访问 git 标签?

如何在谷歌云构建中将参数传递给 docker run