DataflowRunner 需要 gcpTempLocation,但无法从 PipelineOptions 检索值

Posted

技术标签:

【中文标题】DataflowRunner 需要 gcpTempLocation,但无法从 PipelineOptions 检索值【英文标题】:DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions 【发布时间】:2019-11-18 02:57:11 【问题描述】:

我正在创建一个演示管道,以使用我的免费 G​​oogle 帐户将 CSV 文件加载到带有 Dataflow 的 BigQuery 中。这就是我所面临的。

当我从 GCS 文件中读取数据并记录数据时,这非常有效。下面是我的示例代码。

这段代码运行正常

DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("project12345");
options.setStagingLocation("gs://mybucket/staging");
options.setRunner(DataflowRunner.class);
DataflowRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://mybucket/charges.csv")).apply(ParDo.of(new DoFn<String, Void>() 
            @ProcessElement
            public void processElement(ProcessContext c) 
                LOG.info(c.element());
            

));

但是,当我添加一个带有我创建的存储桶路径的临时文件夹位置时,我收到一个错误,下面是我的代码。


        LOG.debug("Starting Pipeline");
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject("project12345");
        options.setStagingLocation("gs://mybucket/staging");
        options.setTempLocation("gs://project12345/temp");
        options.setJobName("csvtobq");
        options.setRunner(DataflowRunner.class);
    
        DataflowRunner.fromOptions(options);
        Pipeline p = Pipeline.create(options);

        boolean isStreaming = false;
        TableReference tableRef = new TableReference();
        tableRef.setProjectId("project12345");
        tableRef.setDatasetId("charges_data");
        tableRef.setTableId("charges_data_id");

        p.apply("Loading Data from GCS", TextIO.read().from("gs://mybucket/charges.csv"))
                .apply("Convert to BiqQuery Table Row", ParDo.of(new FormatForBigquery()))
                .apply("Write into Data in to Big Query",
                        BigQueryIO.writeTableRows().to(tableRef).withSchema(FormatForBigquery.getSchema())
                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                                .withWriteDisposition(isStreaming ? BigQueryIO.Write.WriteDisposition.WRITE_APPEND
                                        : BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

        p.run().waitUntilFinish();
     

当我运行它时,我收到以下错误

Exception in thread "main" java.lang.IllegalArgumentException: DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions
    at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:242)
    at demobigquery.StarterPipeline.main(StarterPipeline.java:74)
Caused by: java.lang.IllegalArgumentException: Error constructing default value for gcpTempLocation: tempLocation is not a valid GCS path, gs://project12345/temp. 
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:247)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:228)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:155)
    at com.sun.proxy.$Proxy15.getGcpTempLocation(Unknown Source)
    at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:240)

这是身份验证问题吗?因为我通过 Eclipse Dataflow 插件从 GCP 使用 JSON 凭据作​​为项目所有者。

任何帮助将不胜感激。

【问题讨论】:

您的 tempLocation 是有效的 GCS URI 吗? beam.apache.org/documentation/runners/dataflow/… 可能与您的问题重复,但尚不清楚为什么它是与身份验证相关的问题。 ***.com/questions/43026371/… 它是一个有效的 URL,我可以浏览到我指定的存储桶。 【参考方案1】:

看起来像从 [1] 抛出的错误消息。默认的 GCS 验证器在 [2] 中实现。如您所见,Beam 代码也会引发IllegalArgumentException 的异常。因此,您可以进一步检查堆栈以了解GcsPathValidator 中发生的异常。

[1]https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java#L278

[2]https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java#L29

【讨论】:

【参考方案2】:

它可能与您正在设置的流式传输选项有关。 CSV 上传会自动设置为批处理作业。因此,如果您尝试将其设置为流,则可能会导致问题。

如果您坚持流式传输,请查看this documentation。

【讨论】:

【参考方案3】:

也许您缺少凭据?由于您需要创建一个文件夹结构(每次执行一个新文件夹),您需要Storage Admin 而不仅仅是Storage Object AdminStorage Object Creator

【讨论】:

【参考方案4】:

这可能有多种原因:

    您没有使用正确的 GCP 项目凭据登录 - 错误的用户(或没有登录的用户)或错误的 项目正在登录中

    确保 GOOGLE_APPLICATION_CREDENTIALS 环境变量 适用于正确的用户和项目。如果没有获得权利 凭据使用

    gcloud auth 应用程序默认登录

    下载 json,并将 GOOGLE_APPLICATION_CREDENTIALS 更改为下载的文件。重新启动系统,然后重试

    您可以使用正确的用户 ID 登录正确的项目, 但可能缺少存储桶访问所需的权限。 确保您具有以下访问权限:

    存储管理员 Storage Legacy Bucket Owner 存储旧对象所有者(可选)

    您尝试的网址不存在或拼写错误

【讨论】:

以上是关于DataflowRunner 需要 gcpTempLocation,但无法从 PipelineOptions 检索值的主要内容,如果未能解决你的问题,请参考以下文章

使用 DataflowRunner 和 Dataflow 服务运行时,PubsubIO 不会将自定义时间戳属性输出为 context.timestamp

使用 Apache Beam 在 GCP DataflowRunner 上没有名为“IPython”的模块

Eclipse 上带有 Dataflow Runner 的 Apache Beam MinimalWordcount 示例

如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery

ImportError:无法导入名称'firestore'

Apache Beam 数据流 BigQuery