Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery
Posted
技术标签:
【中文标题】Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery【英文标题】:Apache Beam : Transform an objects having a list of objects to multiple TableRows to write to BigQuery 【发布时间】:2018-04-07 10:10:16 【问题描述】:我正在研究一个梁管道来处理一个 json 并将其写入 bigquery。 JSON是这样的。
"message": [
"name": "abc",
"itemId": "2123",
"itemName": "test"
,
"name": "vfg",
"itemId": "56457",
"itemName": "Chicken"
],
"publishDate": "2017-10-26T04:54:16.207Z"
我使用 Jackson 将其解析为以下结构。
class Feed
List<Message> messages;
TimeStamp publishDate;
public class Message implements Serializable
/**
*
*/
private static final long serialVersionUID = 1L;
private String key;
private String value;
private Map<String, String> eventItemMap = new HashMap<>();
this property translate the list of map as a single map with all the key-value pair together. because, the messages property will be parsed as list of HashMap objets for each key/value. This will be translated to a single map.
现在在我的管道中,我会将集合转换为
PCollection<KV<String, Feed>>
根据类中的属性将其写入不同的表。我已经写了一个转换来做到这一点。 要求是根据消息对象的数量创建多个 TableRows。我在 JSON 中还有一些属性以及将添加到 tableRow 和每个消息属性的 publishDate。 所以表格如下。
id, name, field1, field2, message1.property1, message1.property2...
id, name, field1, field2, message2.property1, message2.property2...
我尝试创建以下转换。但是,不确定它将如何根据消息列表输出多行。
private class BuildRowListFn extends DoFn<KV<String, Feed>, List<TableRow>>
@ProcessElement
public void processElement(ProcessContext context)
Feed feed = context.element().getValue();
List<Message> messages = feed.getMessage();
List<TableRow> rows = new ArrayList<>();
messages.forEach((message) ->
TableRow row = new TableRow();
row.set("column1", feed.getPublishDate());
row.set("column2", message.getEventItemMap().get("key1"));
row.set("column3", message.getEventItemMap().get("key2"));
rows.add(row);
);
但是,这也将是一个列表,我将无法应用 BigQueryIO.write 转换。
根据 "Eugene" aka @jkff 的评论更新
谢谢@jkff。现在,我已经更改了您在第二段中提到的代码。在messages.forEach中的context.output(row),将表行设置为
List<Message> messages = feed.getMessage();
messages.forEach((message) ->
TableRow row = new TableRow();
row.set("column2", message.getEventItemMap().get("key1"));
context.output(row);
现在,当我尝试将此集合写入 BigQuery 时,
rows.apply(BigQueryIO.writeTableRows().to(getTable(projectId, datasetId, tableName)).withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
我收到以下异常。
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at com.chefd.gcloud.analytics.pipeline.MyPipeline.main(MyPipeline.java:284)
Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:759)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
请帮忙。
谢谢。
【问题讨论】:
【参考方案1】:您似乎假设 DoFn
每个元素只能输出一个值。情况并非如此:它可以为每个元素输出任意数量的值 - 无值、一个值、多个值等。DoFn
甚至可以output values to multiple PCollection's。
在您的情况下,您只需为@ProcessElement
方法中的每一行调用c.output(row)
,例如:rows.forEach(c::output)
。当然,您还需要将DoFn
的类型更改为DoFn<KV<String, Feed>, TableRow>
,因为其输出PCollection
中的元素类型是TableRow
,而不是List<TableRow>
- 您只是将多个元素生成到每个输入元素的集合,但这不会改变类型。
另一种方法是执行您当前执行的操作,同时执行c.output(rows)
,然后应用Flatten.iterables()
将PCollection<List<TableRow>>
展平为PCollection<TableRow>
(您可能需要将List
替换为Iterable
让它工作)。但另一种方法更简单。
【讨论】:
嗨 Eugene,我刚刚添加了 .withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry()) 并且它显示了问题。我动态设置的数据类型是一个键的时间戳,值是一个字符串。现在这是完美的插入。非常感谢您的帮助。它真的救了我!干杯! 谢谢,我也是这么认为的。我认为这类似于 Flink 的 flatMap。干杯以上是关于Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章
如何在Apache Beam / Google Dataflow中使用ParseJsons?
如何将具有元素数组的每个对象的对象列表转换为具有子元素作为属性的对象数组
使用Apache-beam在Python中删除字典中的第一项[重复]