Cloud Dataflow 到 BigQuery - 来源过多

Posted

技术标签:

【中文标题】Cloud Dataflow 到 BigQuery - 来源过多【英文标题】:Cloud Dataflow to BigQuery - too many sources 【发布时间】:2015-01-05 08:41:15 【问题描述】:

我的工作除其他外,还将从文件中读取的一些数据插入到 BigQuery 表中,以供以后手动分析。

它失败并出现以下错误:

job error: Too many sources provided: 10001. Limit is 10000., error: Too many sources provided: 10001. Limit is 10000.

什么是“来源”?是文件还是管道步骤?

谢谢, G

【问题讨论】:

【参考方案1】:

我猜测错误来自 BigQuery,这意味着我们在创建输出表时尝试上传的文件过多。

您能否提供有关错误/上下文的更多详细信息(例如命令行输出的 sn-p(如果使用 BlockingDataflowPipelineRunner)以便我确认?jobId 也会有所帮助。

您的管道结构是否会导致大量输出文件?这可能是大量数据,也可能是没有后续 GroupByKey 操作的精细分片输入文件(这将使我们将数据重新分片成更大的片段)。

【讨论】:

当我从 10k 多个文件中读取输入并计划立即将记录保留在 BigQuery 中而不应用其他转换时,这发生在我身上。 在这种情况下,每个文件都将创建至少一个工作项,并且每个工作项都会生成一个需要上传到最终 BQ 表中的临时文件。我们将考虑最终解决这个问题。同时,有两种基本的解决方法: 1. 将 2.一种效率较低的解决方法是强制使用 GroupByKey,这可能会让系统有机会将数据重新分片到更少的工作单元中。一种。 ParDo 将每条记录转换为 (record, null) b. GroupByKey c. ParDo 将每个 (record, [null, ..., null]) 转换为 record, ..., record 以下是我实施 GroupByKey 建议的方式,以防它对任何人有所帮助。我使用 1 而不是 null 只是因为它更容易设置类型。我的 PCollection 的元素是字符串。抱歉格式化.apply(MapElements.via((String event) -> KV.of(event, 1)).withOutputType(new TypeDescriptor<KV<String, Integer>>() )) .apply(GroupByKey.<String, Integer>create()) .apply(Keys.<String>create())【参考方案2】:

Google Cloud Dataflow BigQueryIO.Write occur Unknown Error (http code 500) 中的注释缓解了这个问题:

Dataflow SDK for Java 1.x:作为一种解决方法,您可以在以下位置启用此实验:--experiments=enable_custom_bigquery_sink

在 Dataflow SDK for Java 2.x 中,此行为是默认行为,无需进行实验。

请注意,在这两个版本中,如果您的作业失败,GCS 中的临时文件可能会留下。

【讨论】:

【参考方案3】:
public static class ForceGroupBy <T> extends PTransform<PCollection<T>, PCollection<KV<T, Iterable<Void>>>> 
    private static final long serialVersionUID = 1L;
    @Override
    public PCollection<KV<T, Iterable<Void>>> apply(PCollection<T> input) 
        PCollection<KV<T,Void>> syntheticGroup = input.apply(
                ParDo.of(new  DoFn<T,KV<T,Void>>()
                    private static final long serialVersionUID = 1L;
                    @Override
                    public void processElement(
                            DoFn<T, KV<T, Void>>.ProcessContext c)
                                    throws Exception 
                        c.output(KV.of(c.element(),(Void)null));

                     ));
        return syntheticGroup.apply(GroupByKey.<T,Void>create());
    

【讨论】:

在这种情况下,一些解释真的很有帮助——这段代码在做什么,它是如何解决这个问题的?这也有助于避免人们复制此代码然后遇到错误。

以上是关于Cloud Dataflow 到 BigQuery - 来源过多的主要内容,如果未能解决你的问题,请参考以下文章

Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象

Cloud Dataflow 中的“辅助输入”是不是支持从 BigQuery 视图中读取?

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表

Google Cloud Dataflow ETL(数据存储区 -> 转换 -> BigQuery)

我可以在 BigQuery 和 Google Cloud Dataflow 中使用相同的编程语言吗?