数据流 - 对 BigQuery 的窗口写入?

Posted

技术标签:

【中文标题】数据流 - 对 BigQuery 的窗口写入?【英文标题】:Dataflow - Windowed writes to BigQuery? 【发布时间】:2019-03-05 06:44:26 【问题描述】:

Dataflow - BigQuery 是否有窗口写入?我正在尝试运行读取 5 亿行文件,然后写入 BigQuery 的 Dataflow 作业。 当我运行时,它没有超过 1500 万,因此查看是否有任何类型的 Windowing 写入 BigQuery 会有所帮助。在运行时,我遇到了许多 GC 分配失败,但我看到这些都是正常的。我在运行时保留了默认的 diskSize 配置。请帮忙。如果有任何窗口写入 BigQuery 的示例,请提供。

至于转换,只是对字符串进行拆分,然后插入到 BigQuery 中。

另外,下面的示例是否在不断从 PubSub 流式传输时不断写入 BigQuery? https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java

下面是我的示例

Pipeline pipeline = Pipeline.create(options);
        PCollection<String> textData = pipeline.apply("Read Text Data",
                TextIO.read().from(options.getInputFilePattern()));
        PCollection<TableRow> tr = textData.apply(ParDo.of(new FormatRemindersFn()));

        tr.apply(BigQueryIO.writeTableRows().withoutValidation()              .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .withSchema(FormatRemindersFn.getSchema())
                //  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .to(options.getSchemaDetails()));

 static class FormatRemindersFn extends DoFn<String, TableRow> 
  @ProcessElement
        public void processElement(ProcessContext c) 
            try 
                if (StringUtils.isNotEmpty(c.element())) 
                    String[] fields = c.element().split("\\^",15);

                  //  logger.info("Fields :", fields[2]);
                    TableRow row = new TableRow().set("MODIFIED_DATE", fields[0])
                            .set("NAME", fields[1])
                            .set("ADDRESS", fields[2]);

                    c.output(row);
                
             catch (Exception e) 
                logger.error("Error: ", e.getMessage());
            
        

【问题讨论】:

500M 对于数据流来说应该很容易。你能确认你正在阅读 GCS 吗?多少个文件?您是否在 15M 时遇到错误? 在注释了作为每个元素的 DoFn 的一部分完成的日志记录后,错误得到了解决。 太棒了。您能否为后代提供自己的答案。 【参考方案1】:

在注释了作为每个元素的 DoFn 的一部分完成的日志记录后,错误得到了解决。处理这么多元素时,不应记录每个元素。

【讨论】:

Dataflow 中的过多日志记录确实是个问题。不仅因为它耗尽了启动磁盘,而且还增加了 Dataflow 的日志记录代理的 CPU 和内存消耗。随意接受你自己的答案。

以上是关于数据流 - 对 BigQuery 的窗口写入?的主要内容,如果未能解决你的问题,请参考以下文章

Beam:使用窗口边界写入每个窗口元素计数

数据已写入 BigQuery,但格式不正确

访问被拒绝:BigQuery BigQuery:写入数据时权限被拒绝

使用 Dataflow 在 BigQuery 表之间进行流式更新

从 BigQuery 读取数据并将其写入云存储上的 avro 文件格式

数据流:我可以使用批处理作业连续写/流写入BigQuery吗?