数据已写入 BigQuery,但格式不正确

Posted

技术标签:

【中文标题】数据已写入 BigQuery,但格式不正确【英文标题】:Data is written to BigQuery but not in proper format 【发布时间】:2018-03-13 10:51:25 【问题描述】:

我正在将数据写入 BigQuery 并成功写入其中。但我担心它的编写格式。

以下是我在 BigQuery 中执行任何查询时显示数据的格式:

检查第一行,SalesComponent 的值为 CPS_H,但它显示 'BeamRecord [dataValues=[CPS_H'' 并且在 ModelIteration 中,该值以方括号结束。

以下是用于将数据从 BeamSql 推送到 BigQuery 的代码:

TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of(
    new TableFieldSchema().setName("SalesComponent").setType("STRING").setMode("REQUIRED"),
    new TableFieldSchema().setName("DuetoValue").setType("STRING").setMode("REQUIRED"),
    new TableFieldSchema().setName("ModelIteration").setType("STRING").setMode("REQUIRED")
));

TableReference tableSpec = BigQueryHelpers.parseTableSpec("beta-194409:data_id1.tables_test");
System.out.println("Start Bigquery");
final_out.apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via(
    (MyOutputClass elem) -> new TableRow().set("SalesComponent", elem.SalesComponent).set("DuetoValue", elem.DuetoValue).set("ModelIteration", elem.ModelIteration)))
        .apply(BigQueryIO.writeTableRows()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

p.run().waitUntilFinish();

编辑

我已使用以下代码将 BeamRecord 转换为 MyOutputClass 类型,但这也不起作用:

 PCollection<MyOutputClass> final_out = join_query.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() 
        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) 
             BeamRecord record = c.element();
               String[] strArr = record.toString().split(",");
            MyOutputClass moc = new MyOutputClass();
            moc.setSalesComponent(strArr[0]);
            moc.setDuetoValue(strArr[1]);
            moc.setModelIteration(strArr[2]);
            c.output(moc);
        
    ));

【问题讨论】:

【参考方案1】:

您的 MyOutputClass 似乎构造不正确(值不正确)。如果你看一下,BigQueryIO 能够创建具有正确字段的行就好了。但是这些字段的值是错误的。这意味着当您调用.set("SalesComponent", elem.SalesComponent) 时,您在elem 中已经有不正确的数据。

我的猜测是问题出在上一步,当您从 BeamRecord 转换为 MyOutputClass 时。如果你做了这样的事情(或者其他一些转换逻辑在幕后为你做了这个),你会得到类似于你所看到的结果:

通过调用beamRecord.toString()BeamRecord转换为字符串; 如果您查看 BeamRecord.toString() 实现,您会发现您得到的正是该字符串格式; 通过,分割这个字符串得到一个字符串数组; 从该数组构造MyOutputClass

伪代码类似于:

PCollection<MyOutputClass> final_out = 
  beamRecords
    .apply(
      ParDo.of(new DoFn() 

        @ProcessElement
        void processElement(Context c) 
           BeamRecord record = c.elem();
           String[] fields = record.toString().split(",");
           MyOutputClass elem = new MyOutputClass();
           elem.SalesComponent = fields[0];
           elem.DuetoValue = fields[1];
           ...
           c.output(elem);
        
      )
    );

这样做的正确方法是在记录上调用 getter,而不是按照这些行(伪代码)拆分其字符串表示:

PCollection<MyOutputClass> final_out = 
      beamRecords
        .apply(
          ParDo.of(new DoFn() 

            @ProcessElement
            void processElement(Context c) 
               BeamRecord record = c.elem();
               MyOutputClass elem = new MyOutputClass();

               //get field value by name
               elem.SalesComponent = record.getString("CPS_H..."); 

               // get another field value by name
               elem.DuetoValue = record.getInteger("...");
               ...
               c.output(elem);
            
          )
        );

您可以通过添加一个简单的ParDo 来验证类似情况,您可以在其中放置断点并查看调试器中的元素,或者将元素输出到其他地方(例如控制台)。

【讨论】:

看起来类似于我的第一个示例,这是从BeamRecord 中获取字段的错误方法。您应该获取和设置moc.setSalesComponent(record.getString("CPS_H...") 之类的字段,而不是字符串拆分,请参见我的第二个示例【参考方案2】:

我能够使用以下方法解决此问题:

 PCollection<MyOutputClass> final_out = record40.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() 
        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) throws ParseException 
             BeamRecord record = c.element();
               String strArr = record.toString();
               String strArr1 = strArr.substring(24);
               String xyz = strArr1.replace("]","");
               String[] strArr2 = xyz.split(",");

【讨论】:

以上是关于数据已写入 BigQuery,但格式不正确的主要内容,如果未能解决你的问题,请参考以下文章

JSON 表架构到 bigquery.TableSchema 用于 BigQuerySink

在bigquery中以编程方式更新/插入数据

数据流:我可以使用批处理作业连续写/流写入BigQuery吗?

从 BigQuery 读取数据并将其写入云存储上的 avro 文件格式

使用来自 Google BigQuery 的数据更新电子表格不起作用

连接 BigQuery 和 Google 表格 - 日期参数问题