在 Java 类中读取 BigQuery 表数据(Pojo)
Posted
技术标签:
【中文标题】在 Java 类中读取 BigQuery 表数据(Pojo)【英文标题】:Reading BigQuery Table Data in for of Java Classes(Pojo) 【发布时间】:2019-05-23 16:43:39 【问题描述】:我需要使用数据流从 Bigquery 读取表数据,而不是使用/将数据存储到 TableRow 类中。我想将数据存储在 Java Pojo 类中,有什么方法可以直接将数据映射到 Pojo 中。
方式2:
GenericRecord s = schemaAndRecord.getRecord();
org.apache.avro.Schema s1 = s.getSchema();
for (Field f : s1.getFields())
counter++;
mapping.put(f.name(), null==s.get(f.name())?null:String.valueOf(s.get(counter)));
if(f.name().equalsIgnoreCase("reason_code_id"))
BigDecimal numericValue =
new Conversions.DecimalConversion()
.fromBytes((ByteBuffer)s.get(f.name()) , Schema.create(s1.getType()), s1.getLogicalType());
System.out.println("Numeric Con"+numericValue);
else
System.out.println("Else Condition "+f.name());
```
Facing Issue:
2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: RECORD
StackTrace
java.io.IOException: Failed to start reading from source: gs://trusted-bucket/mgp/temp/BigQueryExtractTemp/3a5365f1e53d4dd393f0eda15a2c6bd4/000000000000.avro range [0, 65461)
at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:596)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:306)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: Can't create a: RECORD
at org.apache.avro.Schema.create(Schema.java:120)
at com.globalpay.WelcomeEmail.mapRecordToObject(WelcomeEmail.java:118)
at com.globalpay.WelcomeEmail.access$0(WelcomeEmail.java:112)
at com.globalpay.WelcomeEmail$1.apply(WelcomeEmail.java:54)
at com.globalpay.WelcomeEmail$1.apply(WelcomeEmail.java:1)
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:221)
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:214)
at org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:567)
at org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:209)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:593)
... 14 more
【问题讨论】:
【参考方案1】:BigQueryIO#read(SerializableFunction) 允许使用任何现有的 Avro 到 POJO 转换库/函数。
例如,我正在使用blog post中的代码:
private static <T> T mapRecordToObject(GenericRecord record, T object)
Assert.notNull(record, "record must not be null");
Assert.notNull(object, "object must not be null");
final Schema schema = ReflectData.get().getSchema(object.getClass());
Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn’t match");
record.getSchema().getFields().forEach(d -> PropertyAccessorFactory.forDirectFieldAccess(object).setPropertyValue(d.name(), record.get(d.name()) == null ? record.get(d.name()) : record.get(d.name()).toString()));
return object;
PCollection<MyType> data = pipeline.apply(
BigQueryIO
.read(new SerializableFunction<SchemaAndRecord, MyType>()
public MyType apply(SchemaAndRecord schemaAndRecord)
return mapRecordToObject(schemaAndRecord.getRecord(), new MyType());
)
.from("mydataset:mytable"));
博客文章中的代码假定使用 avro 模式来生成 POJO。
【讨论】:
嗨@Lukasz 我已经使用了上述方法,但问题是在从 Big Query 读取数字数据类型值时,数字列的结果是 java.nio.HeapByteBuffer[pos=0 lim=16 cap=16 ],有没有办法在映射到 My Type Class 时反序列化 Numeric 数据类型。 您能否提供从 BigQuery 获得的表示数字类型的 Avro 架构? 我在博客文章中尝试过,但发现数值数据类型值的结果相同。下面的链接可能会对您有所帮助,我正在尝试从这里获得一些输入,但是我的双手是空的。 github.com/apache/beam/blob/… 如果您知道要绑定的类型是 BigDecimal 并且输入模式类型是 ByteBuffer,则可以调用链接到的相同逻辑:go/gh/apache/beam/blob/55564672ca23c9938cfd5b9017046acb4ff5f560/…,然后再设置属性值. 无法访问参考链接:go/gh/apache/beam/blob/55564672ca23c9938cfd5b9017046acb4ff5f560/…以上是关于在 Java 类中读取 BigQuery 表数据(Pojo)的主要内容,如果未能解决你的问题,请参考以下文章
使用 Java 从 BigQuery 到 BigQuery 表的表复制中的 JSON 解析错误
用于读取 JSON 格式数据的 BigQuery 表 URL
从 BigQuery 读取数据并将其写入云存储上的 avro 文件格式
使用 Dataflow 在 BigQuery 表之间进行流式更新
尝试从 BigQuery 读取表并使用 Airflow 将其保存为数据框时出现 _pickle.PicklingError