数据已写入 BigQuery,但格式不正确
Posted
技术标签:
【中文标题】数据已写入 BigQuery,但格式不正确【英文标题】:Data is written to BigQuery but not in proper format 【发布时间】:2018-03-13 10:51:25 【问题描述】:我正在将数据写入 BigQuery 并成功写入其中。但我担心它的编写格式。
以下是我在 BigQuery 中执行任何查询时显示数据的格式:
检查第一行,SalesComponent 的值为 CPS_H,但它显示 'BeamRecord [dataValues=[CPS_H'' 并且在 ModelIteration 中,该值以方括号结束。
以下是用于将数据从 BeamSql 推送到 BigQuery 的代码:
TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of(
new TableFieldSchema().setName("SalesComponent").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("DuetoValue").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("ModelIteration").setType("STRING").setMode("REQUIRED")
));
TableReference tableSpec = BigQueryHelpers.parseTableSpec("beta-194409:data_id1.tables_test");
System.out.println("Start Bigquery");
final_out.apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via(
(MyOutputClass elem) -> new TableRow().set("SalesComponent", elem.SalesComponent).set("DuetoValue", elem.DuetoValue).set("ModelIteration", elem.ModelIteration)))
.apply(BigQueryIO.writeTableRows()
.to(tableSpec)
.withSchema(tableSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();
编辑
我已使用以下代码将 BeamRecord 转换为 MyOutputClass 类型,但这也不起作用:
PCollection<MyOutputClass> final_out = join_query.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>()
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c)
BeamRecord record = c.element();
String[] strArr = record.toString().split(",");
MyOutputClass moc = new MyOutputClass();
moc.setSalesComponent(strArr[0]);
moc.setDuetoValue(strArr[1]);
moc.setModelIteration(strArr[2]);
c.output(moc);
));
【问题讨论】:
【参考方案1】:您的 MyOutputClass
似乎构造不正确(值不正确)。如果你看一下,BigQueryIO
能够创建具有正确字段的行就好了。但是这些字段的值是错误的。这意味着当您调用.set("SalesComponent", elem.SalesComponent)
时,您在elem
中已经有不正确的数据。
我的猜测是问题出在上一步,当您从 BeamRecord
转换为 MyOutputClass
时。如果你做了这样的事情(或者其他一些转换逻辑在幕后为你做了这个),你会得到类似于你所看到的结果:
beamRecord.toString()
将BeamRecord
转换为字符串;
如果您查看 BeamRecord.toString()
实现,您会发现您得到的正是该字符串格式;
通过,
分割这个字符串得到一个字符串数组;
从该数组构造MyOutputClass
;
伪代码类似于:
PCollection<MyOutputClass> final_out =
beamRecords
.apply(
ParDo.of(new DoFn()
@ProcessElement
void processElement(Context c)
BeamRecord record = c.elem();
String[] fields = record.toString().split(",");
MyOutputClass elem = new MyOutputClass();
elem.SalesComponent = fields[0];
elem.DuetoValue = fields[1];
...
c.output(elem);
)
);
这样做的正确方法是在记录上调用 getter,而不是按照这些行(伪代码)拆分其字符串表示:
PCollection<MyOutputClass> final_out =
beamRecords
.apply(
ParDo.of(new DoFn()
@ProcessElement
void processElement(Context c)
BeamRecord record = c.elem();
MyOutputClass elem = new MyOutputClass();
//get field value by name
elem.SalesComponent = record.getString("CPS_H...");
// get another field value by name
elem.DuetoValue = record.getInteger("...");
...
c.output(elem);
)
);
您可以通过添加一个简单的ParDo
来验证类似情况,您可以在其中放置断点并查看调试器中的元素,或者将元素输出到其他地方(例如控制台)。
【讨论】:
看起来类似于我的第一个示例,这是从BeamRecord
中获取字段的错误方法。您应该获取和设置moc.setSalesComponent(record.getString("CPS_H...")
之类的字段,而不是字符串拆分,请参见我的第二个示例【参考方案2】:
我能够使用以下方法解决此问题:
PCollection<MyOutputClass> final_out = record40.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>()
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws ParseException
BeamRecord record = c.element();
String strArr = record.toString();
String strArr1 = strArr.substring(24);
String xyz = strArr1.replace("]","");
String[] strArr2 = xyz.split(",");
【讨论】:
以上是关于数据已写入 BigQuery,但格式不正确的主要内容,如果未能解决你的问题,请参考以下文章
JSON 表架构到 bigquery.TableSchema 用于 BigQuerySink
数据流:我可以使用批处理作业连续写/流写入BigQuery吗?
从 BigQuery 读取数据并将其写入云存储上的 avro 文件格式