BigQuery 代码段中的错误

Posted

技术标签:

【中文标题】BigQuery 代码段中的错误【英文标题】:Error in BigQuery Snippets 【发布时间】:2018-12-15 04:26:24 【问题描述】:

我是数据流的新手,正在尝试动态获取大查询中的表架构。 我还需要动态获取目标表的名称,我在 BigQueryIO.write.to() 中使用动态目标类。如果在执行管道之前为目标表提供了架构,则它可以工作。但是为了动态获取架构,我使用了 BigQuery Snippets,它将 datasetId 和 tableId 作为输入并返回给定表的架构。当尝试使用 Snippets 运行管道时,它会给出下面提到的错误。

感谢任何帮助。 提前致谢。

Exception in thread "main" java.lang.NoSuchMethodError: com.google.api.client.googleapis.services.json.AbstractGoogleJsonClient$Builder.setBatchPath(Ljava/lang/String;)Lcom/google/api/client/googleapis/services/AbstractGoogleClient$Builder;
    at com.google.api.services.bigquery.Bigquery$Builder.setBatchPath(Bigquery.java:3519)
    at com.google.api.services.bigquery.Bigquery$Builder.<init>(Bigquery.java:3498)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryClient(BigQueryServicesImpl.java:881)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$200(BigQueryServicesImpl.java:79)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:388)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:345)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:105)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate(BigQueryIO.java:676)
    at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:640)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:656)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at project2.configTable.main(configTable.java:146)

代码:

package project2;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ValueInSingleWindow;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;


import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;

import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import java.util.HashMap;
import java.util.Map;

import avro.shaded.com.google.common.collect.ImmutableList;

public class configTable 

    public static void main(String[] args) 
        // TODO Auto-generated method stub
        customInt op=PipelineOptionsFactory.as(customInt.class);
        op.setProject("my-new-project");
        op.setTempLocation("gs://train-10/projects");
        op.setWorkerMachineType("n1-standard-1");
        op.setTemplateLocation("gs://train-10/main-template-with-snippets");
        op.setRunner(DataflowRunner.class);
        org.apache.beam.sdk.Pipeline p=org.apache.beam.sdk.Pipeline.create(op);

        PCollection<TableRow> indata=p.apply("Taking side input",BigQueryIO.readTableRows().from("my-new-project:training.config"));

        PCollectionView<String> view=indata.apply("Convert to view",ParDo.of(new DoFn<TableRow, String>() 
            @ProcessElement
            public void processElement(ProcessContext c) 
                TableRow row=c.element();
                c.output(row.get("file").toString());
            
        )).apply(View.asSingleton());  

    PCollection<TableRow> mainop =  p.apply("Taking input",TextIO.read().from(NestedValueProvider.of(op.getInputFile(), new SerializableFunction<String, String>() 

            public String apply(String input) 
                // TODO Auto-generated method stub
                return "gs://train-10/projects/"+input;
            

         ))).apply("Transform",ParDo.of(new DoFn<String, TableRow>() 
            @ProcessElement
            public void processElement(ProcessContext c ) 
                c.output(new TableRow().set("data", c.element()));
                
            ));

    mainop.apply("Write data",BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() 
        @Override
        public String getDestination(ValueInSingleWindow<TableRow> element) 
            // TODO Auto-generated method stub
            String d=sideInput(view);
            String tablespec="my-new-project:training."+d;
            return tablespec;
        
        @Override
         public List<PCollectionView<?>> getSideInputs() 
               return ImmutableList.of(view);
              
        @Override
        public TableDestination getTable(String destination) 
            // TODO Auto-generated method stub
            //String dest=String.format("%s:%s.%s","my-new-project","training", destination);
            String dest=destination;
            return new TableDestination(dest, dest);
        
        @Override
        public TableSchema getSchema(String destination) 

            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

           com.google.cloud.bigquery.Table table=bigquery.getTable("training", destination);
           com.google.cloud.bigquery.Schema tbschema=table.getDefinition().getSchema();
           FieldList tfld=tbschema.getFields();
           List<TableFieldSchema> flds=new ArrayList<>();
            for (Field each : tfld) 
                flds.add(new TableFieldSchema().setName(each.getName()).setType(each.getType().toString()));
            
            return new TableSchema().setFields(flds);

        
    ).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    p.run();
    

【问题讨论】:

Interpreting java.lang.NoSuchMethodError message的可能重复 【参考方案1】:

我认为你不能同时做两个 WRITE_TRUNCATE

.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))

获取表的定义

com.google.cloud.bigquery.Table table=bigquery.getTable("training", destination);
com.google.cloud.bigquery.Schema tbschema=table.getDefinition().getSchema();

因为即使表存在,它也可能在与 BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE 配对时重新创建,此时 getTable 调用将失败。换句话说,WRITE_TRUNCATE 不是原子操作。

我建议您事先创建表(具有正确的架构)(CREATE_NEVER)或附加到表(如果存在)(WRITE_EMPTY 或 WRITE_APPEND)或将架构存储在数据流管道之外并将其读入。

【讨论】:

以上是关于BigQuery 代码段中的错误的主要内容,如果未能解决你的问题,请参考以下文章

如何自动对齐不同代码段中的注释?

如何使用mock对象替换内部代码段中的原始对象?

关闭时,有条件地在图例中错误地渲染段中的段

为啥我的 simpledialog2 在我的 jQuery Mobile 代码段中不起作用?

根据每个段中的标题更改分段控件的宽度?

此 C# 运算符在此代码段中如何工作?