一个数据流作业内的并行管道

Posted

技术标签:

【中文标题】一个数据流作业内的并行管道【英文标题】:Parallel pipeline inside one Dataflow Job 【发布时间】:2019-11-28 05:32:30 【问题描述】:

我想在 GCP 上的一个数据流作业中运行两个并行管道。我已经创建了一个管道,它工作得很好,但我想要另一个而不创建另一个工作。

我已经搜索了很多答案,但找不到任何代码示例:(

如果我这样运行它不起作用:

pipe1.run();
pipe2.run();

它给了我“已经有一个活动的工作名称...如果您想提交第二个工作,请再次尝试使用 --jobName 设置不同的名称”

【问题讨论】:

【参考方案1】:

您可以将其他输入应用到管道,这将在一个作业中产生一个单独的管道。例如:

public class ExamplePipeline 

public static void main(String[] args) 
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
    PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
            ParDo.of(new DoFn<String, String>() 

        @ProcessElement
        public void processElement(ProcessContext c) 
            System.out.println("Pipeline one:" + c.element());
            c.output(c.element() + " extra message.");
        

    ));
    linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));

    PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
    linesForPipelineTwo.apply("Pipeline 2 transoform",
            ParDo.of(new DoFn<String, String>() 

        @ProcessElement
        public void processElement(ProcessContext c) 
            System.out.println("Pipeline two:" + c.element());
        

    ));

    pipeline.run();

如您所见,您也可以将两个(或更多)分离的 PBegin 应用于具有多个 PDone/Sink 的管道。在此示例中,"pipeline 1" 将输出转储并写入文件,"pipeline 2" 仅将其转储到屏幕。

如果您在 GCP 上使用 DataflowRunner 运行此程序,GUI 将显示 2 个未连接的“管道”。

【讨论】:

有没有办法以编程方式做到这一点?例如,而不是定义每个?对于每个: .... 每个 = pipeline.apply() 是的,你可以像你说的那样做。

以上是关于一个数据流作业内的并行管道的主要内容,如果未能解决你的问题,请参考以下文章

在 .gitlab.ci.yml 中并行运行作业

尝试在单台 Mac 机器上使用并行 Jenkins 管道构建 iOS 应用程序时出现缓存问题

.gitlab-ci.yml配置参数

访问数据流管道内的文件

如何从管道内或管道运行之前获取数据流作业 ID?

如何顺序运行Azure管道生成