BigQueryIO.write DynamicDestination withCreateDisposition - 集群字段

Posted

技术标签:

【中文标题】BigQueryIO.write DynamicDestination withCreateDisposition - 集群字段【英文标题】:BigQueryIO.write DynamicDestination withCreateDisposition - Clustering fields 【发布时间】:2018-12-17 16:04:56 【问题描述】:

BigQueryIO.write.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) 连同 DynamicDestinations 我们可以写入动态表,如果该表不存在,它将从 DynamicDestinations 提供的 TableSchema 创建表。

我无法在 TableSchema 模型中添加集群字段部分,因为它没有这样的功能。

我们如何添加具有 TableSchema 和集群字段的 DynamicDestinations?

【问题讨论】:

【参考方案1】:

bigQuery API 是将集群字段添加到表中的一种方法

使用这个link,您可以在编写代码之前测试 API

function execute() 
return gapi.client.bigquery.jobs.insert(
  "resource": 
    "configuration": 
      "query": 
        "clustering": 
          "fields": [
            "Field1",
            "Field2"
          ]
        ,
        "query": "select 5",
        "destinationTable": 
          "datasetId": "Id1",
          "projectId": "Project1",
          "tableId": "T1"
        
      
    
  
)
    .then(function(response) 
            // Handle the results here (response.result has the parsed body).
            console.log("Response", response);
          ,
          function(err)  console.error("Execute error", err); );

这是一个关于如何操作参数的 JS 示例:

static setConfiguration(params, configuration) 
    //To have a destination table we MUST have a tableId
    if (params.destinationTable && params.destinationTable.tableId) 
        configuration.query.destinationTable = params.destinationTable

    
    if (params.clusteringFields) 
        configuration.query.clustering = fields: params.clusteringFields
    
    if (params.timePartitioning) 
        configuration.query.timePartitioning = 
            type: 'DAY',
            field: params.timePartitioning
        
    
    if (params.writeDisposition) 
        configuration.query.writeDisposition = params.writeDisposition
    
    if (params.queryPriority && params.queryPriority.toUpperCase() === "BATCH") 
        configuration.query.priority = "BATCH"
    
    if (params.useCache === false) 
        configuration.query.useQueryCache = params.useCache
    
    if (params.maxBillBytes) 
        configuration.query.maximumBytesBilled = params.maxBillBytes
    
    if (params.maxBillTier) 
        configuration.query.maximumBillingTier = params.maxBillTier
    

【讨论】:

感谢@tamir-klein。我的问题与 BigQueryIO.write 与 apache 光束数据流有关。动态目的地实现了 TableSchema 的功能/方法,而 TableSchema 没有添加集群的功能。 确实在阅读 BigQueryIO 之后。写在这个链接beam.apache.org/documentation/io/built-in/google-bigquery你现在需要编写你的自定义代码。注意:集群仍处于 Beta 模式【参考方案2】:

现在,在 2.16.0 版之后,BigQueryIO 确实提供了在动态目标中添加 clusteringFields 的选项。

    @Override
    public TableDestination getTable(String eventName) 
        return new TableDestination(tableSpec,
                tableDescription, timePartitioning, clustering);
    

注意第4个参数是clustering,可以使用。

【讨论】:

以上是关于BigQueryIO.write DynamicDestination withCreateDisposition - 集群字段的主要内容,如果未能解决你的问题,请参考以下文章

数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException

如何使用 dataflowsdk 将数据从 bigquery 转录到 bigquery?

是否有任何形式可写入 BigQuery 以动态指定目标表的名称?

数据流 bigquery 单元测试

BigQuery 代码段中的错误

C# 4.0 新特性dynamic (待学习)