是否有任何形式可写入 BigQuery 以动态指定目标表的名称?
Posted
技术标签:
【中文标题】是否有任何形式可写入 BigQuery 以动态指定目标表的名称?【英文标题】:Is there any form to write to BigQuery specifying the name of destination tables dynamically? 【发布时间】:2015-06-05 11:57:33 【问题描述】:是否有任何表单可以写入 BigQuery 以动态指定目标表的名称?
现在我有:
bigQueryRQ
.apply(BigQueryIO.Write
.named("Write")
.to("project_name:dataset_name.table_name")
.withSchema(Table.create_auditedTableSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
但我需要“table_name”作为动态表名,这取决于我要写入的“tablerow”数据。
【问题讨论】:
【参考方案1】:我也有同样的问题。 如何按tags 对行进行分组,并为每个组分别应用 BigQueryIO.Write?
public static class TagMarker extends DoFn<TableRow, TableRow>
private Map<String, TupleTag<TableRow>> tagMap;
public TagMarker(Map<String, TupleTag<TableRow>> tagMap)
this.tagMap = tagMap;
@Override
public void processElement(ProcessContext c) throws Exception
TableRow item = c.element();
c.sideOutput(tagMap.get(getTagName(item)), item);
private String getTagName(TableRow row)
// There will be your logic of determinate table by row
return "table" + ((String)row.get("msg")).substring(0, 1);
private static class GbqWriter extends PTransform<PCollection<TableRow>, PDone>
@Override
public PDone apply(PCollection<TableRow> input)
TupleTag<TableRow> mainTag = new TupleTag<TableRow>();
TupleTag<TableRow> tag2 = new TupleTag<TableRow>();
TupleTag<TableRow> tag3 = new TupleTag<TableRow>();
Map<String, TupleTag<TableRow>> tagMap = new HashMap<String, TupleTag<TableRow>>();
tagMap.put("table1", mainTag);
tagMap.put("table2", tag2);
tagMap.put("table3", tag3);
List<TupleTag<?>> tags = new ArrayList<TupleTag<?>>();
tags.add(tag2);
tags.add(tag3);
PCollectionTuple result = input.apply(
ParDo.withOutputTags(mainTag, TupleTagList.of(tags)).of(new TagMarker(tagMap))
);
PDone done = null;
for (String tableId : tagMap.keySet())
done = writeToGbq(tableId, result.get(tagMap.get(tableId)).setCoder(TableRowJsonCoder.of()));
return done;
private PDone writeToGbq(String tableId, PCollection<TableRow> rows)
PDone done = rows
.apply(BigQueryIO.Write.named("WriteToGbq")
.to("<project>:<dataset>." + tableId)
.withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
return done;
我不确定是否要重写变量完成。这是正确的吗?是否可以在失败后阻止重写到 GBQ。
只有在解析行之前知道我们要写入的表的列表时,这种方式才适用。
【讨论】:
当然——数据可以任意分区,这些分区可以写入不同的表。但是,这些表的名称仍然不能与数据相关。它们仍然在管道构建时确定,在读取任何数据之前。【参考方案2】:很遗憾,我们不提供 API 以依赖数据的方式命名 BigQuery 表。一般来说,依赖数据的 BigQuery 表目标可能容易出错。
也就是说,我们正在努力提高该领域的灵活性。目前没有估算值,但我们希望尽快得到。
【讨论】:
不,还没有。这被跟踪为BEAM-437。您可能希望“观察”该问题以自动获得更改通知。以上是关于是否有任何形式可写入 BigQuery 以动态指定目标表的名称?的主要内容,如果未能解决你的问题,请参考以下文章
是否有任何软件或库可用于在 C、C++、Java 或 Ruby 中绘制 3 维螺丝?
从 PubSub 导出到 BigQuery - Dataflow 没有任何反应
在加入并将它们导出到 GCS 之前,如何等待 BigQuery 上的异步表写入?