Apache-beam Bigquery .fromQuery ClassCastException

Posted

技术标签:

【中文标题】Apache-beam Bigquery .fromQuery ClassCastException【英文标题】: 【发布时间】:2020-02-25 18:07:25 【问题描述】:

我正在尝试对BigQuery 表执行查询,提取一列并填充到文件中。 下面的代码会引发异常。我可能是错的,但似乎该过程正在尝试将临时结果以 avro 格式写入临时位置,从中读取数据并引发强制转换异常。

pipeLine.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> 
                  GenericRecord record = elem.getRecord();
                  return (String) record.get("column");
                )
                .fromQuery("SELECT column FROM `project.dataset.table`")
                .usingStandardSql()
                .withCoder(AvroCoder.of(String.class)))
        .apply(TextIO.write().to("gs://bucket/test/result/data")
                .withSuffix(TXT_EXT)
                .withCompression(Compression.GZIP));

引起:java.lang.ClassCastException:org.apache.avro.util.Utf8 不能转换为 java.lang.String at xxxxx.xxx.xxx.sampling.dataflow.samplingextractor.service.BigQueryExportService.lambda$export$43268ee4$1(BigQueryExportService.java:137) 在 org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:242) 在 org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:235) 在 org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:597) 在 org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:209) 在 org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484) 在 org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479) 在 org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) 在 org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:601)

【问题讨论】:

您是否测试过@Haris Nadeem 方法?看来是对的 它不起作用。我在发布这个问题之前尝试过。 你遇到同样的错误了吗? 是的。它抛出了同样的错误。 你是在 DataFlow 还是 DirectRunner 上运行它? 【参考方案1】:

我认为它建议您使用.withCoder(AvroCoder.of(org.apache.avro.util.Utf8.class))),因为字符串不能直接从 Avro Utf8 类转换。

【讨论】:

不能把这个放在一起 AvroCoder.of(org.apache.avro.util.Utf8.class))【参考方案2】:

从documentation here 看来,您似乎只想使用StringUtf8Coder 类。

pipeLine.apply(
    BigQueryIO.read(
            (SchemaAndRecord elem) -> 
              GenericRecord record = elem.getRecord();
              return (String) record.get("column");
            )
            .fromQuery("SELECT column FROM `project.dataset.table`")
            .usingStandardSql()
            .withCoder(StringUtf8Coder.of()))
        .apply(TextIO.write().to("gs://bucket/test/result/data")
            .withSuffix(TXT_EXT)
            .withCompression(Compression.GZIP));

【讨论】:

这种方法似乎是正确的@Jay Yoo。你测试过吗?

以上是关于Apache-beam Bigquery .fromQuery ClassCastException的主要内容,如果未能解决你的问题,请参考以下文章

Apache-Beam 将序列号添加到 PCollection

将批量数据写入 bigQuery

如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery

数据流:我可以使用批处理作业连续写/流写入BigQuery吗?

优化 BigQuery 资源的使用,使用 Google Dataflow 从 GCS 加载 200 万个 JSON 文件

BigQuery:SPLIT() 返回错误