使用 TextIO 和 ValueProvider 创建数据流模板时出错

Posted

技术标签:

【中文标题】使用 TextIO 和 ValueProvider 创建数据流模板时出错【英文标题】:Error creating dataflow template with TextIO and ValueProvider 【发布时间】:2018-10-24 22:19:33 【问题描述】:

我正在尝试创建一个谷歌数据流模板,但我似乎无法找到一种方法来做到这一点而不会产生以下异常:

WARNING: Size estimation of the source failed: RuntimeValueProviderpropertyName=inputFile, default=null
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProviderpropertyName=inputFile, default=null
        at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:234)
        at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:218)
        at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
        at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
        at org.apache.beam.examples.MyMinimalWordCount.main(MyMinimalWordCount.java:69)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
        at java.lang.Thread.run(Thread.java:748)

我可以使用 Beam 中 MinimalWordCount 示例的简单修改版本来重现它。

public class MyMinimalWordCount 

    public interface WordCountOptions extends PipelineOptions 
        @Description("Path of the file to read from")
        ValueProvider<String> getInputFile();

        void setInputFile(ValueProvider<String> valueProvider);
    

    public static void main(String[] args) 

        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);

        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.read().from(options.getInputFile()))

                .apply(FlatMapElements
                        .into(TypeDescriptors.strings())
                        .via((String word) -> Arrays.asList(word.split("[^\\pL]+"))))
                .apply(Filter.by((String word) -> !word.isEmpty()))
                .apply(Count.perElement())
                .apply(MapElements
                        .into(TypeDescriptors.strings())
                        .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
                .apply(TextIO.write().to("wordcounts"));

        // Having the waitUntilFinish causes a NPE when trying to create a dataflow template
        //p.run().waitUntilFinish();

        p.run();
    

我可以在本地运行示例:

mvn compile exec:java \
     -Pdirect-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--inputFile=pom.xml " 

它还通过以下方式在 Google Dataflow 上运行:

mvn compile exec:java \
     -Pdataflow-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[project] \
                  --inputFile=gs://[bucket]/input.csv "

但是当我尝试使用以下内容创建 Google Dataflow 模板时,我收到了错误:

mvn compile exec:java \
     -Pdataflow-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[project] \
                  --stagingLocation=gs://[bucket]/staging \
                  --templateLocation=gs://[bucket]/templates/MyMinimalWordCountTemplate " 

另一个令人困惑的事情是 maven 构建继续并以 BUILD SUCCESS 结束

所以我的问题是:

Q1)我是否应该能够创建这样的 Google Dataflow 模板(使用 ValueProviders 在运行时提供 TextIO 输入)?

Q2) 构建过程中的异常是真正的错误还是日志似乎表明的只是警告?

Q3) 如果 Q1 和 Q2 的答案是肯定的并且“只是一个警告”,并且我尝试从上传的模板创建作业,为什么它没有任何元数据或不知道我的输入选项?

我使用过的参考资料:

https://cloud.google.com/dataflow/docs/templates/creating-templates https://beam.apache.org/get-started/quickstart-java/ https://beam.apache.org/documentation/runners/dataflow/#setup

【问题讨论】:

我可以回答 Q3)。我认为 maven/dataflow 构建会自动生成所需的元数据。它不是。可以在此处找到有关如何提供自己的元数据的详细说明:cloud.google.com/dataflow/docs/templates/… 编译程序后,它会在您在--templateLocation= 中定义的路径中为您生成模板。接下来,您将转到 3 中的屏幕并使用您添加的模板位置运行作业。最后,在additional parameters 中单击add item,在您的键类型inputFile 中单击输入文件的存储桶位置。完成后,运行作业,您应该不会再遇到问题了。 谢谢@haris,这是否意味着异常只是一个警告? 发生错误是因为它希望您将--inputFile 作为您没有传递的参数。所以从技术上讲,您告诉程序创建一个模板并运行该作业。因此,当它没有看到任何工作时,它就会产生错误。至少据我了解。我从来不需要直接使用模板。模板应该仍然已经创建了 【参考方案1】:

我相信--inputFiles 是在创建模板时与模板捆绑在一起的。

请参阅注释1:“除了模板文件,模板化管道执行还依赖于在创建模板时暂存和引用的文件。如果移动或删除暂存文件,您的管道执行将失败。”

这个帖子似乎也很相关2

【讨论】:

【参考方案2】:

正确的答案是您不必在制作模板时提供输入,它应该在运行时将输入作为值。例外是 Google 数据流中的一个内部问题,将来应该将其删除。

【讨论】:

以上是关于使用 TextIO 和 ValueProvider 创建数据流模板时出错的主要内容,如果未能解决你的问题,请参考以下文章

如何编写一个满足typing.TextIO的类文件类?

VHDL textio,从文件中读取图像

如何获取 ValueProvider 的值并将其写入 BigQuery 表?

从 ValueProvider 读取的 Dataflow BigQuery:“StaticValueProvider”对象没有属性“projectId”

Google Dataflow - 如果写入本地服务器,如何在 java 中指定 TextIO?

数据流 TextIO.write 缩放问题