如何在不运行管道的情况下使用Apache Beam连接器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在不运行管道的情况下使用Apache Beam连接器相关的知识,希望对你有一定的参考价值。

我们正在kubernetes容器中运行程序,该容器正在侦听pubsub消息。根据消息数据类型,它将启动数据流作业。作业执行完成后,我们再次将pubsub消息发送到另一个系统。

管道以批处理模式启动,它从GCS读取并在处理后写入GCS。

Pipeline pipeline = Pipeline.create(options);
PCollection<String> read = pipeline
                .apply("Read from GCS",
                        TextIO.read().from("GCS_PATH").withCompression(Compression.GZIP));

//process 
// write to GCS
....
PipelineResult result = pipeline.run();
result.waitUntilFinish();

# send job completed message to Pubsub to other component
....
....

因为我必须将事件发送到系统中的其他组件。到目前为止,我正在使用Pubsbub Java客户端库将消息推送到pubsub。

有一种方法,我可以使用apache Pubsub连接器发送如下消息-或做同样的正确方法是什么

PubsubIO.writeMessages().to("topicName");
答案

要解决此用例,您可以使用Wait API。可以找到详细信息here

 PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
 data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of firstWriteResults closes.
     .apply(ParDo.of(...write to second database...));

以上是关于如何在不运行管道的情况下使用Apache Beam连接器的主要内容,如果未能解决你的问题,请参考以下文章

在 Beam 管道中以编程方式生成 BigQuery 架构

在 Dataflow 上运行的 Apache Beam 管道无法从 KafkaIO 读取:SSL 握手失败

如何组合两个结果并将其传递到 apache-beam 管道中的下一步

Apache Beam 数据流:从 Azure 到 GCS 的文件传输

Apache Beam - 即使程序连续执行,也会捕获并抛出异常。如何停止该进程或在管道中处理

Apache Beam - 跳过管道步骤