Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象

Posted

技术标签:

【中文标题】Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象【英文标题】:BigQuery in Dataflow fails to load data from Cloud Storage: JSON object specified for non-record field 【发布时间】:2016-12-27 07:58:42 【问题描述】:

我有一个 Dataflow 管道在我的机器上本地运行并写入 BigQuery。此批处理作业中的 BigQuery 需要一个临时位置。我在我的云存储中提供了一个。相关部分是:

PipelineOptions options = PipelineOptionsFactory.create();
    options.as(BigQueryOptions.class)
            .setTempLocation("gs://folder/temp");
    Pipeline p = Pipeline.create(options);

....

List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("uuid").setType("STRING"));
      fields.add(new TableFieldSchema().setName("start_time").setType("TIMESTAMP"));
      fields.add(new TableFieldSchema().setName("end_time").setType("TIMESTAMP"));
      TableSchema schema = new TableSchema().setFields(fields);

session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
      .apply(BigQueryIO.Write
      .withSchema(schema)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      .to("myproject:db.table"));

FormatAsTableRowFn 我有:

static class FormatAsTableRowFn extends DoFn<KV<String, String>, TableRow>
implements RequiresWindowAccess  
    @Override
        public void processElement(ProcessContext c) 
          TableRow row = new TableRow()
              .set("uuid", c.element().getKey())
              // include a field for the window timestamp
             .set("start_time", ((IntervalWindow) c.window()).start().toInstant()) //NOTE: I tried both with and without 
             .set("end_time", ((IntervalWindow) c.window()).end().toInstant());   // .toInstant receiving the same error
          c.output(row);
        
      

如果我打印出row.toString(),我会得到合法的时间戳

uuid=00:00:00:00:00:00, start_time=2016-09-22T07:34:38.000Z, end_time=2016-09-22T07:39:38.000Z

当我运行这段代码 JAVA 说:Failed to create the load job beam_job_XXX

手动检查 GCS 中的temp 文件夹,对象如下:

"mac":"00:00:00:00:00:00","start_time":"millis":1474529678000,"chronology":"zone":"fixed":true,"id":"UTC","zone":"fixed":true,"id":"UTC","afterNow":false,"beforeNow":true,"equalNow":false,"end_time":"millis":1474529978000,"chronology":"zone":"fixed":true,"id":"UTC","zone":"fixed":true,"id":"UTC","afterNow":false,"beforeNow":true,"equalNow":false

查看 BigQuery 中失败的作业报告,错误显示:

JSON object specified for non-record field: start_time (error code: invalid)

这很奇怪,因为我很确定我说这是一个 TIMESTAMP,而且我 100% 确定我在 BigQuery 中的架构符合 SDK 中的 TableSchema。 (注意:设置withCreateDisposition...CREATE_IF_NEEDEDyields 相同的结果)

谁能告诉我我需要如何解决这个问题才能在 BigQuery 中获取数据?

【问题讨论】:

【参考方案1】:

不要使用Instant 对象。尝试使用毫秒/秒。

https://cloud.google.com/bigquery/data-types

一个正数指定自纪元以来的秒数

所以,这样的事情应该可以工作:

.getMillis() / 1000

【讨论】:

以上是关于Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象的主要内容,如果未能解决你的问题,请参考以下文章

Cloud Dataflow 中的“辅助输入”是不是支持从 BigQuery 视图中读取?

从 Dataflow python 作业写入 bigquery 中的分区表

当内核过多或多于一台机器时,Dataflow 无法获取对 BigQuery 表的引用

使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery

如何使用 python 将字典写入 Dataflow 中的 Bigquery

从 PubSub 导出到 BigQuery - Dataflow 没有任何反应