BigQueryIO - 为每个项目写入两个表
Posted
技术标签:
【中文标题】BigQueryIO - 为每个项目写入两个表【英文标题】:BigQueryIO - Write to two tables for each item 【发布时间】:2018-02-05 17:23:52 【问题描述】:我正在尝试使用 Apache Beam 在 Dataflow 中编写作业。 这项工作需要获取输入并将其转换为我的自定义对象。 这个对象代表一个内存测试,它包含固定的属性,如时间戳、名称......以及带有属性的分区列表
public class TestResult
String testName;
String testId;
String testStatus;
String testResult;
List<Partition> testPartitions;
public class Partition
String testId;
String filesystem;
String mountedOn;
String usePercentage;
String available;
String size;
String used;
我的最后一次转换,采用这个 TestResult 对象并将其转换为表格行。
static class TestResultToRowConverter extends DoFn<TestResult, TableRow>
/**
* In this example, put the whole string into single BigQuery field.
*/
@ProcessElement
public void processElement(ProcessContext c)
System.out.println("setting TestResult-> TestResult:" + c.element());
c.output(new TableRow().set("testName", c.element().testName).set("testId", c.element().testId).set("testStatus", c.element().testStatus).set("testResult", c.element().testResult).set("memoryTestData", "example data test"));
for (Partition pt : c.element().testPartitions)
c.output(partitionsTag, new TableRow().set("testId", pt.testId).set("filesystem", pt.filesystem).set("mountedOn", pt.mountedOn).set("usePercentage", pt.usePercentage).set("available", pt.available).set("size", pt.size).set("used", pt.used));
现在我想将它们写入 BigQuery,但测试结果的表行转到具有特定架构的一个表,而分区转到另一个具有另一个架构的表。还提到有一个 Id 链接两者,我需要在添加 testResult 时自动生成,并在插入分区行时重用。
我怎样才能做到这一点?
我用它来写入 1 个表,但如果我想写入两个表,我会迷路。
.apply("MemoryTest_WriteToBigQuery", BigQueryIO.writeTableRows().to(TestResultToRowConverter.getTableSpec1())
.withSchema(TestResultToRowConverter.getMemoryTestSchema())
.withWriteDisposition(WriteDisposition.WRITE_APPEND))
编辑:
这是我的管道:
pipeline.apply("ReadFromPubSubToBigQuery_MemoryTest", PubsubIO.readMessagesWithAttributes().fromTopic(options.getPubsubTopic()))
.apply("MemoryTest_ProcessObject", ParDo.of(new ProcessTestResult()))
.apply("MemoryTest_IdentifyMemoryTest",ParDo.of(new DetectTestType()))
.apply("MemoryTest_TransformIntoTableRow", ParDo.of(new TestResultToRowConverter()).withOutputTags(partitionsTag))
.apply("MemoryTest_WriteToBigQuery", BigQueryIO.writeTableRows().to(TestResultToRowConverter.getTableSpec1())
.withSchema(TestResultToRowConverter.getMemoryTestSchema())
.withWriteDisposition(WriteDisposition.WRITE_APPEND))
【问题讨论】:
【参考方案1】:光束管道不限于一个接一个地应用的单一直线变换 - 这将是非常严格的。
您可以对任何PCollection
应用任意数量的转换。
PCollection<TableRow> rows = ...;
rows.apply(BigQueryIO.writeTableRows().to(first table));
rows.apply(BigQueryIO.writeTableRows().to(second table));
rows.apply(some more processing)
.apply(BigQueryIO.writeTableRows().to(third table));
【讨论】:
我怎样才能使它适应我的管道?我编辑了我的 OP,所以你可以看到它的样子。 只需将直到 "MemoryTest_TransformIntoTableRow" 的内容提取到 PCollection以上是关于BigQueryIO - 为每个项目写入两个表的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam 的 BigQueryIO (Java):无法将 TIMESTAMP 字段写入 BigQuery——fastxml.jackson 异常“类型不支持”