Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象
Posted
技术标签:
【中文标题】Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象【英文标题】:BigQuery in Dataflow fails to load data from Cloud Storage: JSON object specified for non-record field 【发布时间】:2016-12-27 07:58:42 【问题描述】:我有一个 Dataflow 管道在我的机器上本地运行并写入 BigQuery。此批处理作业中的 BigQuery 需要一个临时位置。我在我的云存储中提供了一个。相关部分是:
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BigQueryOptions.class)
.setTempLocation("gs://folder/temp");
Pipeline p = Pipeline.create(options);
....
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("uuid").setType("STRING"));
fields.add(new TableFieldSchema().setName("start_time").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("end_time").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.to("myproject:db.table"));
FormatAsTableRowFn
我有:
static class FormatAsTableRowFn extends DoFn<KV<String, String>, TableRow>
implements RequiresWindowAccess
@Override
public void processElement(ProcessContext c)
TableRow row = new TableRow()
.set("uuid", c.element().getKey())
// include a field for the window timestamp
.set("start_time", ((IntervalWindow) c.window()).start().toInstant()) //NOTE: I tried both with and without
.set("end_time", ((IntervalWindow) c.window()).end().toInstant()); // .toInstant receiving the same error
c.output(row);
如果我打印出row.toString()
,我会得到合法的时间戳:
uuid=00:00:00:00:00:00, start_time=2016-09-22T07:34:38.000Z, end_time=2016-09-22T07:39:38.000Z
当我运行这段代码 JAVA 说:Failed to create the load job beam_job_XXX
手动检查 GCS 中的temp
文件夹,对象如下:
"mac":"00:00:00:00:00:00","start_time":"millis":1474529678000,"chronology":"zone":"fixed":true,"id":"UTC","zone":"fixed":true,"id":"UTC","afterNow":false,"beforeNow":true,"equalNow":false,"end_time":"millis":1474529978000,"chronology":"zone":"fixed":true,"id":"UTC","zone":"fixed":true,"id":"UTC","afterNow":false,"beforeNow":true,"equalNow":false
查看 BigQuery 中失败的作业报告,错误显示:
JSON object specified for non-record field: start_time (error code: invalid)
这很奇怪,因为我很确定我说这是一个 TIMESTAMP,而且我 100% 确定我在 BigQuery 中的架构符合 SDK 中的 TableSchema
。 (注意:设置withCreateDisposition...CREATE_IF_NEEDED
yields 相同的结果)
谁能告诉我我需要如何解决这个问题才能在 BigQuery 中获取数据?
【问题讨论】:
【参考方案1】:不要使用Instant
对象。尝试使用毫秒/秒。
https://cloud.google.com/bigquery/data-types
一个正数指定自纪元以来的秒数
所以,这样的事情应该可以工作:
.getMillis() / 1000
【讨论】:
以上是关于Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象的主要内容,如果未能解决你的问题,请参考以下文章
Cloud Dataflow 中的“辅助输入”是不是支持从 BigQuery 视图中读取?
从 Dataflow python 作业写入 bigquery 中的分区表
当内核过多或多于一台机器时,Dataflow 无法获取对 BigQuery 表的引用
使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery