表行中的 BigqueryIO 架构
Posted
技术标签:
【中文标题】表行中的 BigqueryIO 架构【英文标题】:BigqueryIO schema from table row 【发布时间】:2020-12-18 14:43:43 【问题描述】:我有一个 TableRow 的 PCollection,其表行中的表名和架构如下所示
我需要在提到的行中将 col1、col2 和 col3 值插入到带有 Schema 的表中 BigQueryIO.writeTableRows()
我们可以使用 lambda 从行中访问表名,如下所示 BigQueryIO.writeTableRows().to(tablerow->tablerow.get("Table").toString()) 但是我们如何使用 BigQueryIO.writeTableRows().withSchema() 从表行访问模式
请帮忙
【问题讨论】:
【参考方案1】:您需要使用DynamicDestination 为每个元素选择要应用的表名称和架构。
分解文档的代码示例
events.apply(BigQueryIO.<UserEvent>write()
.to(new DynamicDestinations<UserEvent, String>()
// Here you extract the "key" value for the selection of schema and table.
// If you need the values of the 2 first column, you can create your own structure
// Example: <schema>|<table>
public String getDestination(ValueInSingleWindow<UserEvent> element)
return element.getValue().getUserId();
// Here return the Table destination (dataset and table) according to the "Key"
// If you have the value in your key, you can do key.split("|")[1], according to my previous example
public TableDestination getTable(String user)
return new TableDestination(tableForUser(user), "Table for user " + user);
//Same for the schema here.
public TableSchema getSchema(String user)
return tableSchemaForUser(user);
)
【讨论】:
以上是关于表行中的 BigqueryIO 架构的主要内容,如果未能解决你的问题,请参考以下文章