如何在数据流管道上的 TextIO.write 之后添加另一个进程
Posted
技术标签:
【中文标题】如何在数据流管道上的 TextIO.write 之后添加另一个进程【英文标题】:How to add another process after TextIO.write on dataflow pipeline 【发布时间】:2021-04-20 08:02:38 【问题描述】:我创建了一个简单的数据流管道,其中包含这个过程:
-
从 bigquery 获取/读取数据
将输出更改为 csv 格式
在 Google 存储上创建 CSV 文件
//TODO 发送 CSV 文件给第三方
pipeline.apply("ReadFromBigQuery",
BigQueryIO.read(new MyCustomObject1(input))
.fromQuery(myCustomQuery)
.usingStandardSql()
).apply("ConvertToCsv",
ParDo.of(new myCustomObject2())
).apply("WriteToCSV",
TextIO.write().to(fileLocation)
.withSuffix(".csv")
.withoutSharding()
.withDelimiter(new char[] '\r', '\n')
.withHeader(csvHeader)
);
但在第 3 步(写入 GS)之后,我无法向数据流添加另一个进程 我怎样才能做到这一点?
【问题讨论】:
【参考方案1】:因为 TextIO.write() 返回的是 PDone,而不是之前 PTransform 中的 PCollection。
步骤 2 中的一种可能解决方案,您可以使用带有标签的多输出来写入不同的位置。
final TupleTag<String> csvOutTag= new TupleTag<String>();
final TupleTag<String> furtherProcessingTag= new TupleTag<String>();
PCollectionTuple mixedCollection =
bigQueryReadCollection.apply(ParDo
.of(new DoFn<TableRow,String>()
@ProcessElement
public void processElement(ProcessContext c)
// Emit to main output, which is the output
c.output(c.element().toString());
// Emit to output with tag furtherProcessing
c.output(furtherProcessingTag, c.element());
).withOutputTags(csvOutTag,
TupleTagList.of(furtherProcessingTag)));
// Get output with tag csvOutTag.
mixedCollection.get(csvOutTag).apply("WriteToCSV",
TextIO.write().to(fileLocation)
.withSuffix(".csv")
.withoutSharding()
.withDelimiter(new char[] '\r', '\n')
.withHeader(csvHeader));
// Get output with tag furtherProcessingTag.
mixedCollection.get(furtherProcessingTag).apply(...);
请根据您的输出在 TupleTag 声明中添加适当的数据类型以进行进一步处理。
【讨论】:
以上是关于如何在数据流管道上的 TextIO.write 之后添加另一个进程的主要内容,如果未能解决你的问题,请参考以下文章