BigQueryIO.write DynamicDestination withCreateDisposition - 集群字段
Posted
技术标签:
【中文标题】BigQueryIO.write DynamicDestination withCreateDisposition - 集群字段【英文标题】:BigQueryIO.write DynamicDestination withCreateDisposition - Clustering fields 【发布时间】:2018-12-17 16:04:56 【问题描述】:BigQueryIO.write.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) 连同 DynamicDestinations 我们可以写入动态表,如果该表不存在,它将从 DynamicDestinations 提供的 TableSchema 创建表。
我无法在 TableSchema 模型中添加集群字段部分,因为它没有这样的功能。
我们如何添加具有 TableSchema 和集群字段的 DynamicDestinations?
【问题讨论】:
【参考方案1】:bigQuery API 是将集群字段添加到表中的一种方法
使用这个link,您可以在编写代码之前测试 API
function execute()
return gapi.client.bigquery.jobs.insert(
"resource":
"configuration":
"query":
"clustering":
"fields": [
"Field1",
"Field2"
]
,
"query": "select 5",
"destinationTable":
"datasetId": "Id1",
"projectId": "Project1",
"tableId": "T1"
)
.then(function(response)
// Handle the results here (response.result has the parsed body).
console.log("Response", response);
,
function(err) console.error("Execute error", err); );
这是一个关于如何操作参数的 JS 示例:
static setConfiguration(params, configuration)
//To have a destination table we MUST have a tableId
if (params.destinationTable && params.destinationTable.tableId)
configuration.query.destinationTable = params.destinationTable
if (params.clusteringFields)
configuration.query.clustering = fields: params.clusteringFields
if (params.timePartitioning)
configuration.query.timePartitioning =
type: 'DAY',
field: params.timePartitioning
if (params.writeDisposition)
configuration.query.writeDisposition = params.writeDisposition
if (params.queryPriority && params.queryPriority.toUpperCase() === "BATCH")
configuration.query.priority = "BATCH"
if (params.useCache === false)
configuration.query.useQueryCache = params.useCache
if (params.maxBillBytes)
configuration.query.maximumBytesBilled = params.maxBillBytes
if (params.maxBillTier)
configuration.query.maximumBillingTier = params.maxBillTier
【讨论】:
感谢@tamir-klein。我的问题与 BigQueryIO.write 与 apache 光束数据流有关。动态目的地实现了 TableSchema 的功能/方法,而 TableSchema 没有添加集群的功能。 确实在阅读 BigQueryIO 之后。写在这个链接beam.apache.org/documentation/io/built-in/google-bigquery你现在需要编写你的自定义代码。注意:集群仍处于 Beta 模式【参考方案2】:现在,在 2.16.0 版之后,BigQueryIO 确实提供了在动态目标中添加 clusteringFields 的选项。
@Override
public TableDestination getTable(String eventName)
return new TableDestination(tableSpec,
tableDescription, timePartitioning, clustering);
注意第4个参数是clustering,可以使用。
【讨论】:
以上是关于BigQueryIO.write DynamicDestination withCreateDisposition - 集群字段的主要内容,如果未能解决你的问题,请参考以下文章
数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException
如何使用 dataflowsdk 将数据从 bigquery 转录到 bigquery?