为啥使用 Dataflow 写入 Bigquery 非常慢?

Posted

技术标签:

【中文标题】为啥使用 Dataflow 写入 Bigquery 非常慢?【英文标题】:Why is writing to Bigquery using Dataflow EXTREMELY slow?为什么使用 Dataflow 写入 Bigquery 非常慢? 【发布时间】:2018-07-16 07:53:05 【问题描述】:

我可以以每秒大约 10,000 次插入的速度将插入直接流式传输到 BigQuery,但是当我尝试使用 Dataflow 插入时,“ToBqRow”步骤(如下所示)非常慢。 每 10 分钟仅 50 行,这是 4 名工人。知道为什么吗?以下是相关代码:

PCollection<Status> statuses = p
        .apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
        .apply("ExtractData", ParDo.of(new DoFn<String, Status>() 
    @ProcessElement
    public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception 
            String rowJson = c.element();

            try 
                TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
                Status status = TwitterObjectFactory.createStatus(rowJson);
                if (status == null) 
                    TweetsWriter.LOGGER.error("Status is null");
                 else 
                    TweetsWriter.LOGGER.debug("Status value: " + status.getText());
                
                c.output(status);
                TweetsWriter.LOGGER.debug("Status: " + status.getId());
             catch (Exception var4) 
                TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
            

    
));

statuses
        .apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() 
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception 
                TableRow row = new TableRow();
                Status status = c.element();
                row.set("Id", status.getId());
                row.set("Text", status.getText());
                row.set("RetweetCount", status.getRetweetCount());
                row.set("FavoriteCount", status.getFavoriteCount());
                row.set("Language", status.getLang());
                row.set("ReceivedAt", (Object)null);
                row.set("UserId", status.getUser().getId());
                row.set("CountryCode", status.getPlace().getCountryCode());
                row.set("Country", status.getPlace().getCountry());
                c.output(row);
        
    ))
        .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
                .withSchema(schema)
                .withMethod(Method.STREAMING_INSERTS)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

p.run();

【问题讨论】:

您是否对您的statuses 执行任何计算量大的操作?也许您已经陷入 Beam 图融合优化 (cloud.google.com/dataflow/service/…) 并且您的多个变换被压缩为单个变换,这可能会导致瓶颈。尝试在ToBQRow 之前进行重新洗牌。 我已经更新了上面的代码。正如你所看到的,我不做任何计算量大的操作。只需从 PubSub 主题中读取消息,提取相关信息,创建 TableRow 对象并编写它。 'ToBQRow' 似乎是真正的罪魁祸首:输入集合 -> 添加的元素 -> 13,829。输出集合 -> 添加元素 -> 249. 我没有看到任何类型的窗口,这可能是个问题 不明白为什么我必须使用窗口!我没有汇总数据无论如何,尝试了这个但它没有帮助:(不确定这是否正确使用!):.apply(“GetTweets”,PubsubIO.readStrings().fromTopic(topic)).apply( "TimeWindow", Window.into(SlidingWindows.of(averagingInterval).every(averagingInterval))) 我已经阅读了 Status 的文档,但未能成功理解下面发生的事情,但我认为问题出在 TableRow 对象上。您能否验证 TableRow 没有填充 Null 数据?其次,您能否验证架构与 TableRow 匹配?如果不是,这将解释为什么只映射部分行,因为架构只与某些行匹配(即当附加项为空时)。如果您可以验证这些不是问题,我会尝试继续挖掘 【参考方案1】:

事实证明 Dataflow 下的 Bigquery 并不慢。 问题是,“status.getPlace().getCountryCode()”返回 NULL,所以它抛出 NullPointerException,我在日志!显然,Dataflow 日志记录需要改进。它现在运行得非常好。消息一进入主题,几乎立即就会写入 BigQuery!

【讨论】:

我的写入速度也很慢。如果错误不在日志中,您是如何发现错误的?

以上是关于为啥使用 Dataflow 写入 Bigquery 非常慢?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用在 Dataflow 执行期间计算的架构写入 BigQuery?

在 Dataflow 中从 BigQuery 写入云存储时如何设置文件大小而不是分片数

在 Dataflow 管道中写入 BigQuery 表失败

从 Dataflow python 作业写入 bigquery 中的分区表

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表