Apache-beam Bigquery .fromQuery ClassCastException
Posted
技术标签:
【中文标题】Apache-beam Bigquery .fromQuery ClassCastException【英文标题】: 【发布时间】:2020-02-25 18:07:25 【问题描述】:我正在尝试对BigQuery
表执行查询,提取一列并填充到文件中。
下面的代码会引发异常。我可能是错的,但似乎该过程正在尝试将临时结果以 avro 格式写入临时位置,从中读取数据并引发强制转换异常。
pipeLine.apply(
BigQueryIO.read(
(SchemaAndRecord elem) ->
GenericRecord record = elem.getRecord();
return (String) record.get("column");
)
.fromQuery("SELECT column FROM `project.dataset.table`")
.usingStandardSql()
.withCoder(AvroCoder.of(String.class)))
.apply(TextIO.write().to("gs://bucket/test/result/data")
.withSuffix(TXT_EXT)
.withCompression(Compression.GZIP));
引起:java.lang.ClassCastException:org.apache.avro.util.Utf8 不能转换为 java.lang.String at xxxxx.xxx.xxx.sampling.dataflow.samplingextractor.service.BigQueryExportService.lambda$export$43268ee4$1(BigQueryExportService.java:137) 在 org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:242) 在 org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:235) 在 org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:597) 在 org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:209) 在 org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484) 在 org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479) 在 org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) 在 org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:601)
【问题讨论】:
您是否测试过@Haris Nadeem 方法?看来是对的 它不起作用。我在发布这个问题之前尝试过。 你遇到同样的错误了吗? 是的。它抛出了同样的错误。 你是在 DataFlow 还是 DirectRunner 上运行它? 【参考方案1】:我认为它建议您使用.withCoder(AvroCoder.of(org.apache.avro.util.Utf8.class)))
,因为字符串不能直接从 Avro Utf8 类转换。
【讨论】:
不能把这个放在一起 AvroCoder.of(org.apache.avro.util.Utf8.class))【参考方案2】:从documentation here 看来,您似乎只想使用StringUtf8Coder 类。
pipeLine.apply(
BigQueryIO.read(
(SchemaAndRecord elem) ->
GenericRecord record = elem.getRecord();
return (String) record.get("column");
)
.fromQuery("SELECT column FROM `project.dataset.table`")
.usingStandardSql()
.withCoder(StringUtf8Coder.of()))
.apply(TextIO.write().to("gs://bucket/test/result/data")
.withSuffix(TXT_EXT)
.withCompression(Compression.GZIP));
【讨论】:
这种方法似乎是正确的@Jay Yoo。你测试过吗?以上是关于Apache-beam Bigquery .fromQuery ClassCastException的主要内容,如果未能解决你的问题,请参考以下文章
Apache-Beam 将序列号添加到 PCollection
如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery
数据流:我可以使用批处理作业连续写/流写入BigQuery吗?
优化 BigQuery 资源的使用,使用 Google Dataflow 从 GCS 加载 200 万个 JSON 文件