一个接一个扩展 PTransforms 的触发类?
Posted
技术标签:
【中文标题】一个接一个扩展 PTransforms 的触发类?【英文标题】:Triggering Class Which Extends PTransforms one After One? 【发布时间】:2018-03-27 23:31:34 【问题描述】:我有 2 个类扩展 PTransform
,称为 CompositeCall2
和 CompositeCall
。
我必须先调用CompositeCall
,然后在完成CompositeCall
中完成的工作后,我必须在我的数据流程序中调用CompositeCall2
。我正在制作我的数据流作业的模板,因此正在进行并行处理,这让我的工作变得困难。
代码.:Tester.java
package Testing2;
import java.util.List;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import com.google.api.services.bigquery.model.TableRow;
public class Tester
public interface FileData extends PipelineOptions
@Description("name Of the File")
@Validation.Required
ValueProvider<String> getInputFile();
void setInputFile(ValueProvider<String> value);
@Description("Path Of File From Where We need To Read Data")
@Validation.Required
ValueProvider<String> getOutputFile();
void setOutputFile(ValueProvider<String> value);
public static void main(String[] args) throws InterruptedException
DataflowPipelineOptions options=PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("testing1-180111");
options.setTempLocation("gs://kishan-bucket/staging");
options.setTemplateLocation("gs://kiss-bucket/templates/Test1");
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
PDone dta = p.begin().apply("Creating File",Create.of("Kishan")).apply(new CompositeCall(p));
p.apply("Creating File",Create.of("Kishan")).apply(new CompositeCall2(p));
p.run().waitUntilFinish();
这就是我打电话的方式。 两个类都在做同样的工作,只是在文件中打印数据并写入数据。
打包测试2;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompositeCall2 extends PTransform <PCollection<String>,PCollection<String>>
private static final long serialVersionUID = 1L;
static Pipeline p;
public CompositeCall2(Pipeline p1)
this.p = p1;
private static final Logger LOG = LoggerFactory.getLogger(CompositeCall2.class);
@Override
public PCollection<String> expand(PCollection<String> input)
PCollection<String> data;
input.apply(ParDo.of(new testing())).apply(TextIO.write().to("gs://kiss-bucket/test1.txt"));
LOG.info("Enter Second Stage Called");
return input;
static class testing extends DoFn<String,String>
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException
LOG.info("Enter Second Stage");
c.output("Data Is"+c.element());
如何同步流,以便在一次转换后运行另一次转换?
【问题讨论】:
【参考方案1】:这有点老了,但我今天遇到了它,我想我可以分享给遇到类似问题的其他人 - 有一个 Wait.on 转换允许您等待先前的转换完成,然后再应用以后的操作。
【讨论】:
以上是关于一个接一个扩展 PTransforms 的触发类?的主要内容,如果未能解决你的问题,请参考以下文章