BigQuery 代码段中的错误
Posted
技术标签:
【中文标题】BigQuery 代码段中的错误【英文标题】:Error in BigQuery Snippets 【发布时间】:2018-12-15 04:26:24 【问题描述】:我是数据流的新手,正在尝试动态获取大查询中的表架构。 我还需要动态获取目标表的名称,我在 BigQueryIO.write.to() 中使用动态目标类。如果在执行管道之前为目标表提供了架构,则它可以工作。但是为了动态获取架构,我使用了 BigQuery Snippets,它将 datasetId 和 tableId 作为输入并返回给定表的架构。当尝试使用 Snippets 运行管道时,它会给出下面提到的错误。
感谢任何帮助。 提前致谢。
Exception in thread "main" java.lang.NoSuchMethodError: com.google.api.client.googleapis.services.json.AbstractGoogleJsonClient$Builder.setBatchPath(Ljava/lang/String;)Lcom/google/api/client/googleapis/services/AbstractGoogleClient$Builder;
at com.google.api.services.bigquery.Bigquery$Builder.setBatchPath(Bigquery.java:3519)
at com.google.api.services.bigquery.Bigquery$Builder.<init>(Bigquery.java:3498)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryClient(BigQueryServicesImpl.java:881)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$200(BigQueryServicesImpl.java:79)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:388)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:345)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:105)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate(BigQueryIO.java:676)
at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:640)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:656)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at project2.configTable.main(configTable.java:146)
代码:
package project2;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import java.util.HashMap;
import java.util.Map;
import avro.shaded.com.google.common.collect.ImmutableList;
public class configTable
public static void main(String[] args)
// TODO Auto-generated method stub
customInt op=PipelineOptionsFactory.as(customInt.class);
op.setProject("my-new-project");
op.setTempLocation("gs://train-10/projects");
op.setWorkerMachineType("n1-standard-1");
op.setTemplateLocation("gs://train-10/main-template-with-snippets");
op.setRunner(DataflowRunner.class);
org.apache.beam.sdk.Pipeline p=org.apache.beam.sdk.Pipeline.create(op);
PCollection<TableRow> indata=p.apply("Taking side input",BigQueryIO.readTableRows().from("my-new-project:training.config"));
PCollectionView<String> view=indata.apply("Convert to view",ParDo.of(new DoFn<TableRow, String>()
@ProcessElement
public void processElement(ProcessContext c)
TableRow row=c.element();
c.output(row.get("file").toString());
)).apply(View.asSingleton());
PCollection<TableRow> mainop = p.apply("Taking input",TextIO.read().from(NestedValueProvider.of(op.getInputFile(), new SerializableFunction<String, String>()
public String apply(String input)
// TODO Auto-generated method stub
return "gs://train-10/projects/"+input;
))).apply("Transform",ParDo.of(new DoFn<String, TableRow>()
@ProcessElement
public void processElement(ProcessContext c )
c.output(new TableRow().set("data", c.element()));
));
mainop.apply("Write data",BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>()
@Override
public String getDestination(ValueInSingleWindow<TableRow> element)
// TODO Auto-generated method stub
String d=sideInput(view);
String tablespec="my-new-project:training."+d;
return tablespec;
@Override
public List<PCollectionView<?>> getSideInputs()
return ImmutableList.of(view);
@Override
public TableDestination getTable(String destination)
// TODO Auto-generated method stub
//String dest=String.format("%s:%s.%s","my-new-project","training", destination);
String dest=destination;
return new TableDestination(dest, dest);
@Override
public TableSchema getSchema(String destination)
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
com.google.cloud.bigquery.Table table=bigquery.getTable("training", destination);
com.google.cloud.bigquery.Schema tbschema=table.getDefinition().getSchema();
FieldList tfld=tbschema.getFields();
List<TableFieldSchema> flds=new ArrayList<>();
for (Field each : tfld)
flds.add(new TableFieldSchema().setName(each.getName()).setType(each.getType().toString()));
return new TableSchema().setFields(flds);
).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
p.run();
【问题讨论】:
Interpreting java.lang.NoSuchMethodError message的可能重复 【参考方案1】:我认为你不能同时做两个 WRITE_TRUNCATE
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
和获取表的定义
com.google.cloud.bigquery.Table table=bigquery.getTable("training", destination);
com.google.cloud.bigquery.Schema tbschema=table.getDefinition().getSchema();
因为即使表存在,它也可能在与 BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE 配对时重新创建,此时 getTable 调用将失败。换句话说,WRITE_TRUNCATE 不是原子操作。
我建议您事先创建表(具有正确的架构)(CREATE_NEVER)或附加到表(如果存在)(WRITE_EMPTY 或 WRITE_APPEND)或将架构存储在数据流管道之外并将其读入。
【讨论】:
以上是关于BigQuery 代码段中的错误的主要内容,如果未能解决你的问题,请参考以下文章