如何在数据流管道上的 TextIO.write 之后添加另一个进程

Posted

技术标签:

【中文标题】如何在数据流管道上的 TextIO.write 之后添加另一个进程【英文标题】:How to add another process after TextIO.write on dataflow pipeline 【发布时间】:2021-04-20 08:02:38 【问题描述】:

我创建了一个简单的数据流管道,其中包含这个过程:

    从 bigquery 获取/读取数据 将输出更改为 csv 格式 在 Google 存储上创建 CSV 文件 //TODO 发送 CSV 文件给第三方
pipeline.apply("ReadFromBigQuery",
      BigQueryIO.read(new MyCustomObject1(input))
          .fromQuery(myCustomQuery)
          .usingStandardSql()
  ).apply("ConvertToCsv",
      ParDo.of(new myCustomObject2())
  ).apply("WriteToCSV",
      TextIO.write().to(fileLocation)
          .withSuffix(".csv")
          .withoutSharding()
          .withDelimiter(new char[] '\r', '\n')
          .withHeader(csvHeader)
  );

但在第 3 步(写入 GS)之后,我无法向数据流添加另一个进程 我怎样才能做到这一点?

【问题讨论】:

【参考方案1】:

因为 TextIO.write() 返回的是 PDone,而不是之前 PTransform 中的 PCollection。

步骤 2 中的一种可能解决方案,您可以使用带有标签的多输出来写入不同的位置。

final TupleTag<String> csvOutTag= new TupleTag<String>();
final TupleTag<String> furtherProcessingTag= new TupleTag<String>();

PCollectionTuple mixedCollection =    
bigQueryReadCollection.apply(ParDo
            .of(new DoFn<TableRow,String>() 
              @ProcessElement
              public void processElement(ProcessContext c) 
                  // Emit to main output, which is the output
                 c.output(c.element().toString());
      
                  // Emit to output with tag furtherProcessing
                  c.output(furtherProcessingTag, c.element());
                
              
            ).withOutputTags(csvOutTag,
                    TupleTagList.of(furtherProcessingTag)));

// Get output with tag csvOutTag.
mixedCollection.get(csvOutTag).apply("WriteToCSV",
  TextIO.write().to(fileLocation)
      .withSuffix(".csv")
      .withoutSharding()
      .withDelimiter(new char[] '\r', '\n')
      .withHeader(csvHeader));

// Get output with tag furtherProcessingTag.
mixedCollection.get(furtherProcessingTag).apply(...);

请根据您的输出在 TupleTag 声明中添加适当的数据类型以进行进一步处理。

【讨论】:

以上是关于如何在数据流管道上的 TextIO.write 之后添加另一个进程的主要内容,如果未能解决你的问题,请参考以下文章

在 PDone 之后执行处理

go语言快速入门 IPC之Socket 9

Linux IPC之管道和FIFO

MongoDB开发之 管道操作符

linux之进程间通信——管道

go语言快速入门 IPC之管道通信 8