BigQueryIO - 为每个项目写入两个表

Posted

技术标签:

【中文标题】BigQueryIO - 为每个项目写入两个表【英文标题】:BigQueryIO - Write to two tables for each item 【发布时间】:2018-02-05 17:23:52 【问题描述】:

我正在尝试使用 Apache Beam 在 Dataflow 中编写作业。 这项工作需要获取输入并将其转换为我的自定义对象。 这个对象代表一个内存测试,它包含固定的属性,如时间戳、名称......以及带有属性的分区列表

public class TestResult 

    String testName;
    String testId;
    String testStatus;
    String testResult;
    List<Partition> testPartitions;

public class Partition 
    String testId;
    String filesystem;
    String mountedOn;
    String usePercentage;
    String available;
    String size;
    String used;

我的最后一次转换,采用这个 TestResult 对象并将其转换为表格行。

static class TestResultToRowConverter extends DoFn<TestResult, TableRow> 
    /**
     * In this example, put the whole string into single BigQuery field.
     */
    @ProcessElement
    public void processElement(ProcessContext c) 
      System.out.println("setting TestResult-> TestResult:" + c.element());
      c.output(new TableRow().set("testName", c.element().testName).set("testId", c.element().testId).set("testStatus", c.element().testStatus).set("testResult", c.element().testResult).set("memoryTestData", "example data test"));
      for (Partition pt :  c.element().testPartitions)
      
        c.output(partitionsTag, new TableRow().set("testId", pt.testId).set("filesystem", pt.filesystem).set("mountedOn", pt.mountedOn).set("usePercentage", pt.usePercentage).set("available", pt.available).set("size", pt.size).set("used", pt.used));
      
    

现在我想将它们写入 BigQuery,但测试结果的表行转到具有特定架构的一个表,而分区转到另一个具有另一个架构的表。还提到有一个 Id 链接两者,我需要在添加 testResult 时自动生成,并在插入分区行时重用。

我怎样才能做到这一点?

我用它来写入 1 个表,但如果我想写入两个表,我会迷路。

    .apply("MemoryTest_WriteToBigQuery", BigQueryIO.writeTableRows().to(TestResultToRowConverter.getTableSpec1())
        .withSchema(TestResultToRowConverter.getMemoryTestSchema())
        .withWriteDisposition(WriteDisposition.WRITE_APPEND))

编辑:

这是我的管道:

pipeline.apply("ReadFromPubSubToBigQuery_MemoryTest", PubsubIO.readMessagesWithAttributes().fromTopic(options.getPubsubTopic()))
    .apply("MemoryTest_ProcessObject", ParDo.of(new ProcessTestResult()))
    .apply("MemoryTest_IdentifyMemoryTest",ParDo.of(new DetectTestType()))
    .apply("MemoryTest_TransformIntoTableRow", ParDo.of(new TestResultToRowConverter()).withOutputTags(partitionsTag))
    .apply("MemoryTest_WriteToBigQuery", BigQueryIO.writeTableRows().to(TestResultToRowConverter.getTableSpec1())
        .withSchema(TestResultToRowConverter.getMemoryTestSchema())
        .withWriteDisposition(WriteDisposition.WRITE_APPEND))

【问题讨论】:

【参考方案1】:

光束管道不限于一个接一个地应用的单一直线变换 - 这将是非常严格的。

您可以对任何PCollection 应用任意数量的转换。

PCollection<TableRow> rows = ...;
rows.apply(BigQueryIO.writeTableRows().to(first table));
rows.apply(BigQueryIO.writeTableRows().to(second table));
rows.apply(some more processing)
    .apply(BigQueryIO.writeTableRows().to(third table));

【讨论】:

我怎样才能使它适应我的管道?我编辑了我的 OP,所以你可以看到它的样子。 只需将直到 "MemoryTest_TransformIntoTableRow" 的内容提取到 PCollection 类型的变量中。 许多 Beam 示例将 PCollections 传递给多个转换。例如。看到这个github.com/apache/beam/blob/…:下面使用gameEvents来计算团队得分和用户得分。

以上是关于BigQueryIO - 为每个项目写入两个表的主要内容,如果未能解决你的问题,请参考以下文章

BigqueryIO 无法写入日期分区表

Apache Beam 的 BigQueryIO (Java):无法将 TIMESTAMP 字段写入 BigQuery——fastxml.jackson 异常“类型不支持”

BigQueryIO.Read 查询与作业:查询

Apache Beam、BigQueryIO、writeTableRows() 与 write()

BigQueryIO 读取获取 TableSchema

为啥 Apache Beam BigQueryIO 每次运行都使用相同的 JobId?