为啥 Apache Beam 中的 CustomOptions 不继承 DataflowPipelineOptions 默认属性?

Posted

技术标签:

【中文标题】为啥 Apache Beam 中的 CustomOptions 不继承 DataflowPipelineOptions 默认属性?【英文标题】:Why CustomOptions in Apache Beam is not inheriting DataflowPipelineOptions default properties?为什么 Apache Beam 中的 CustomOptions 不继承 DataflowPipelineOptions 默认属性? 【发布时间】:2020-03-21 13:37:05 【问题描述】:

我是 Apache Beam 的新手,正在尝试使用 DirectRunner 和 DataflowRunner 运行示例读写程序。在我的用例中,CLI 参数很少,为了实现这一点,我创建了一个接口“CustomOptions.java”,它扩展了 PipelineOptions。

使用 DirectRunner 程序运行良好,但使用 DataflowRunner 时,它显示“接口 CustomOptions 缺少名为 'project' 的属性”。

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.2.0</version>
        <type>maven-plugin</type>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

CustomOptions.java(接口)

import org.apache.beam.sdk.options.PipelineOptions;

public interface CustomOptions extends PipelineOptions 

    String getInput();
    void setInput(String value);

    String getOutput();
    void setOutput(String value);

WordCount.java

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class WordCount 

    public static void main(String args[]) 
        PipelineOptionsFactory.register(CustomOptions.class);
        CustomOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("Read", TextIO.read().from(options.getInput()))
                .apply("Write", TextIO.write().to(options.getOutput()));

        p.run();
    

命令:

DirectRunner (Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath
DataflowRunner (Not Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath --runner=DataflowRunner --stagingLocation=gs://<tmp_path> --project=<projectId>

错误:

Exception in thread "main" java.lang.IllegalArgumentException: Class interface CustomOptions missing a property named 'project'.
    at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1625)
    at org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:115)
    at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:298)
    at WordCount.main(WordCount.java:13)

我尝试的第二件事是使用 DataflowPipelineOptions 而不是 PipelineOptions 扩展 CustomOptions。也使用这个,我得到一个错误:

Exception in thread "main" java.lang.IllegalArgumentException: No filesystem found for scheme gs
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
    at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:215)
    at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:734)
    at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1069)
    at WordCount.main(WordCount.java:15)

第二次试用还有一个问题,即不能使用 DirectRunner 和 DataflowRunner 执行相同的代码。因为在第二种情况下,“projectId”是一个强制参数,不会在 DirectRunner 中指定。

【问题讨论】:

在第一种情况下,您可以删除 --project= 只是为了澄清一下,当您看到“没有为方案 gs 找到文件系统”错误时,您是否扩展了 DataflowPipelineOptions 并在 DataflowRunner 上运行它?如果您扩展 DataflowPipelineOptions,我不希望发生该错误。您是否介意澄清 (1) 您使用了两个命令行中的哪一个,以及 (2) 当您看到该错误时您正在扩展哪个选项类? 我不能 100% 确定您是否可以将 DataflowPipelineOptions 与 DirectRunner 一起使用。如果它要求您在 DirectRunner 中传递 --project 之类的参数,则如果您传递未使用的占位符值,它可能会起作用。虽然我认为 --project 参数用于源和接收器,但如果它们读取/写入数据到 GCP 服务。在这种情况下,您需要指定一个有效值。如果失败,您可以有两个主要程序来交换选项类,用于 DataflowRunner 和 DirectRunner。 @JayadeepJayaraman 如果我删除 --project=,它会为键 --stagingLocation= 引发另一个异常。它指出 CustomOptions.java 没有键“stagingLocation”。 @AlexAmato 是的,你没看错,当我扩展 DataflowPipelineOptions 时出现“没有为方案 gs 找到文件系统”错误。我正在使用这两个命令,一个使用 DirectRunner,另一个使用 DataflowRunner。 【参考方案1】:

经过几次尝试和错误,我认为我做对了。 我正在使用与问题中提到的相同的 java 类,即使用 PipelineOptions 扩展 CustomOptions.java。我所做的唯一更改是在 pom.xml 中。

现在我使用 maven shade 插件和一些额外的配置,而不是 maven 程序集插件。有了这些我取得的成就: 1. DirectRunner 或 DataflowRunner 可以使用同一个 jar。 2. 说明我想从命令行执行哪个主类。

上一个'pom.xml':

<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.2.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id> <!-- this is used for inheritance merges -->
                    <phase>package</phase> <!-- bind to the packaging phase -->
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            <!-- add Main-Class to manifest file -->
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.dh.WordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

<dependencies>
    <dependency>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.2.0</version>
        <type>maven-plugin</type>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

新的'pom.xml':

<build>
    <plugins>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

<dependencies>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

当我读到这个答案时,这成为可能:Google Dataflow "No filesystem found for scheme gs"

【讨论】:

以上是关于为啥 Apache Beam 中的 CustomOptions 不继承 DataflowPipelineOptions 默认属性?的主要内容,如果未能解决你的问题,请参考以下文章

使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表

Apache Beam 中的窗口函数

Apache Beam 处理文件

Apache Beam 管道中的连续状态

尝试从 Apache Beam 中的选项获取值时出现空指针异常

使用 Python 处理 Apache Beam 管道中的异常