如何使用 BigQuery Streaming 获取插入的行数

Posted

技术标签:

【中文标题】如何使用 BigQuery Streaming 获取插入的行数【英文标题】:How to get the number of rows inserted using BigQuery Streaming 【发布时间】:2020-10-26 10:15:02 【问题描述】:

我正在从 CSV 文件中读取数据,使用 Streaming Insert 中的 insertAll() 方法将数据插入到 Big Query 表中,如下所示:

InsertAllResponse response = dfsf.insertAll(InsertAllRequest.newBuilder(tableId).setRows(rows).build());

rows 这是一个 Iterable 声明如下:

Iterable<InsertAllRequest.RowToInsert> rows

现在,我实际上是按照此处的建议批量插入 500 大小的行 - link to suggestion

插入所有数据后,如何计算插入的总行数? 我想找出它并将其记录到 log4j。

【问题讨论】:

【参考方案1】:

这可以通过以下两种方式之一完成

    通过 getQueryResults 的 BigQuery 作业 API https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults Cloud Logging,tableDataChange 字段中您想要的输出。

这是一个示例输出:


  "protoPayload": 
    "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
    "status": ,
    "authenticationInfo": 
      "principalEmail": "service_account"
    ,
    "requestMetadata": 
      "callerIp": "2600:1900:2000:1b:400::27",
      "callerSuppliedUserAgent": "gl-python/3.7.1 grpc/1.22.0 gax/1.14.2 gapic/1.12.1 gccl/1.12.1,gzip(gfe)"
    ,
    "serviceName": "bigquery.googleapis.com",
    "methodName": "google.cloud.bigquery.v2.JobService.InsertJob",
    "authorizationInfo": [
      
        "resource": "projects/project_id/datasets/dataset/tables/table",
        "permission": "bigquery.tables.updateData",
        "granted": true
      
    ],
    "resourceName": "projects/project_id/datasets/dataset/tables/table",
    "metadata": 
      "tableDataChange": 
        "deletedRowsCount": "2",
        "insertedRowsCount": "2",
        "reason": "QUERY",
        "jobName": "projects/PRJOECT_ID/jobs/85f19bdd-aff5-4abe-9283-9f0bc9ed3ce8"
      ,
      "@type": "type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"
    
  ,
  "insertId": "7x7ye390qm",
  "resource": 
    "type": "bigquery_dataset",
    "labels": 
      "project_id": "PRJOECT_ID",
      "dataset_id": "dataset-id"
    
  ,
  "timestamp": "2020-10-26T07:00:22.960735Z",
  "severity": "INFO",
  "logName": "projects/PRJOECT_ID/logs/cloudaudit.googleapis.com%2Fdata_access",
  "receiveTimestamp": "2020-10-26T07:00:23.763159336Z"

【讨论】:

以上是关于如何使用 BigQuery Streaming 获取插入的行数的主要内容,如果未能解决你的问题,请参考以下文章

BigQuery Streaming API 经常引发 503 错误

使用 Apache Beam 向 BigQuery 传播插入时如何指定 insertId

如果我在流式传输之前先删除表并创建表,Google BigQuery Streaming 有时会失败

BigQuery:表 ID 无效

Google BigQuery - 将数据流式传输到 BigQuery

流式缓冲区 - Google BigQuery