是否有任何形式可写入 BigQuery 以动态指定目标表的名称?

Posted

技术标签:

【中文标题】是否有任何形式可写入 BigQuery 以动态指定目标表的名称?【英文标题】:Is there any form to write to BigQuery specifying the name of destination tables dynamically? 【发布时间】:2015-06-05 11:57:33 【问题描述】:

是否有任何表单可以写入 BigQuery 以动态指定目标表的名称?

现在我有:

bigQueryRQ
.apply(BigQueryIO.Write
    .named("Write")
    .to("project_name:dataset_name.table_name")
    .withSchema(Table.create_auditedTableSchema())
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

但我需要“table_name”作为动态表名,这取决于我要写入的“tablerow”数据。

【问题讨论】:

【参考方案1】:

我也有同样的问题。 如何按tags 对行进行分组,并为每个组分别应用 BigQueryIO.Write?

    public static class TagMarker extends DoFn<TableRow, TableRow> 

    private Map<String, TupleTag<TableRow>> tagMap;

    public TagMarker(Map<String, TupleTag<TableRow>> tagMap) 
        this.tagMap = tagMap;
    

    @Override
    public void processElement(ProcessContext c) throws Exception 
        TableRow item = c.element();
        c.sideOutput(tagMap.get(getTagName(item)), item);
    

    private String getTagName(TableRow row) 
        // There will be your logic of determinate table by row
        return "table" + ((String)row.get("msg")).substring(0, 1);
    




private static class GbqWriter extends PTransform<PCollection<TableRow>, PDone> 

    @Override
    public PDone apply(PCollection<TableRow> input) 

        TupleTag<TableRow> mainTag = new TupleTag<TableRow>();
        TupleTag<TableRow> tag2 = new TupleTag<TableRow>();
        TupleTag<TableRow> tag3 = new TupleTag<TableRow>();

        Map<String, TupleTag<TableRow>> tagMap = new HashMap<String, TupleTag<TableRow>>();
        tagMap.put("table1", mainTag);
        tagMap.put("table2", tag2);
        tagMap.put("table3", tag3);

        List<TupleTag<?>> tags = new ArrayList<TupleTag<?>>();
        tags.add(tag2);
        tags.add(tag3);

        PCollectionTuple result = input.apply(
            ParDo.withOutputTags(mainTag, TupleTagList.of(tags)).of(new TagMarker(tagMap))
        );

        PDone done = null;
        for (String tableId : tagMap.keySet()) 
            done = writeToGbq(tableId, result.get(tagMap.get(tableId)).setCoder(TableRowJsonCoder.of()));
        

        return done;
    


    private PDone writeToGbq(String tableId, PCollection<TableRow> rows) 

        PDone done = rows
                .apply(BigQueryIO.Write.named("WriteToGbq")
                .to("<project>:<dataset>." + tableId)
                .withSchema(getSchema())
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        );

        return done;
    


我不确定是否要重写变量完成。这是正确的吗?是否可以在失败后阻止重写到 GBQ。

只有在解析行之前知道我们要写入的表的列表时,这种方式才适用。

【讨论】:

当然——数据可以任意分区,这些分区可以写入不同的表。但是,这些表的名称仍然不能与数据相关。它们仍然在管道构建时确定,在读取任何数据之前。【参考方案2】:

很遗憾,我们不提供 API 以依赖数据的方式命名 BigQuery 表。一般来说,依赖数据的 BigQuery 表目标可能容易出错。

也就是说,我们正在努力提高该领域的灵活性。目前没有估算值,但我们希望尽快得到。

【讨论】:

不,还没有。这被跟踪为BEAM-437。您可能希望“观察”该问题以自动获得更改通知。

以上是关于是否有任何形式可写入 BigQuery 以动态指定目标表的名称?的主要内容,如果未能解决你的问题,请参考以下文章

数据流 - 对 BigQuery 的窗口写入?

是否有任何软件或库可用于在 C、C++、Java 或 Ruby 中绘制 3 维螺丝?

从 PubSub 导出到 BigQuery - Dataflow 没有任何反应

在加入并将它们导出到 GCS 之前,如何等待 BigQuery 上的异步表写入?

Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery

写入 BigQuery 时出现 MojoExecutionException