一个接一个扩展 PTransforms 的触发类?

Posted

技术标签:

【中文标题】一个接一个扩展 PTransforms 的触发类?【英文标题】:Triggering Class Which Extends PTransforms one After One? 【发布时间】:2018-03-27 23:31:34 【问题描述】:

我有 2 个类扩展 PTransform,称为 CompositeCall2CompositeCall

我必须先调用CompositeCall,然后在完成CompositeCall 中完成的工作后,我必须在我的数据流程序中调用CompositeCall2。我正在制作我的数据流作业的模板,因此正在进行并行处理,这让我的工作变得困难。

代码.:Tester.java

package Testing2;

import java.util.List;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import com.google.api.services.bigquery.model.TableRow;

public class Tester 

    public interface FileData extends PipelineOptions 
         @Description("name Of the File")
         @Validation.Required
         ValueProvider<String> getInputFile();
         void setInputFile(ValueProvider<String> value);

         @Description("Path Of File From Where We need To Read Data")
         @Validation.Required
         ValueProvider<String> getOutputFile();
         void setOutputFile(ValueProvider<String> value);

    

    public static void main(String[] args) throws InterruptedException 

        DataflowPipelineOptions options=PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject("testing1-180111");
        options.setTempLocation("gs://kishan-bucket/staging");
        options.setTemplateLocation("gs://kiss-bucket/templates/Test1");
        options.setRunner(DataflowRunner.class);
        Pipeline p = Pipeline.create(options);

        PDone dta = p.begin().apply("Creating File",Create.of("Kishan")).apply(new CompositeCall(p));
        p.apply("Creating File",Create.of("Kishan")).apply(new CompositeCall2(p));
        p.run().waitUntilFinish();  
    

这就是我打电话的方式。 两个类都在做同样的工作,只是在文件中打印数据并写入数据。

打包测试2;

import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompositeCall2 extends PTransform <PCollection<String>,PCollection<String>> 


    private static final long serialVersionUID = 1L;
    static Pipeline p;

    public CompositeCall2(Pipeline p1) 
        this.p = p1;
    
    private static final Logger LOG = LoggerFactory.getLogger(CompositeCall2.class);

    @Override
    public PCollection<String> expand(PCollection<String> input) 
        PCollection<String> data;
        input.apply(ParDo.of(new testing())).apply(TextIO.write().to("gs://kiss-bucket/test1.txt"));
        LOG.info("Enter Second Stage Called");
        return input;           
    

    static class testing extends DoFn<String,String>

        @ProcessElement
        public void processElement(ProcessContext c) throws InterruptedException
            LOG.info("Enter Second Stage");
            c.output("Data Is"+c.element());
        
    

如何同步流,以便在一次转换后运行另一次转换?

【问题讨论】:

【参考方案1】:

这有点老了,但我今天遇到了它,我想我可以分享给遇到类似问题的其他人 - 有一个 Wait.on 转换允许您等待先前的转换完成,然后再应用以后的操作。

【讨论】:

以上是关于一个接一个扩展 PTransforms 的触发类?的主要内容,如果未能解决你的问题,请参考以下文章

arduino扩展板5v怎么接

arcpy脚本使用多接图表图斑对对应多幅影像进行裁边处理

选择所有复选框数据表触发器

黑莓 Application.activate() 未触发

扩展角度框架类RouteReuseStrategy时出错

行为篇-命令模式