如何在不运行管道的情况下使用Apache Beam连接器
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在不运行管道的情况下使用Apache Beam连接器相关的知识,希望对你有一定的参考价值。
我们正在kubernetes容器中运行程序,该容器正在侦听pubsub消息。根据消息数据类型,它将启动数据流作业。作业执行完成后,我们再次将pubsub消息发送到另一个系统。
管道以批处理模式启动,它从GCS读取并在处理后写入GCS。
Pipeline pipeline = Pipeline.create(options);
PCollection<String> read = pipeline
.apply("Read from GCS",
TextIO.read().from("GCS_PATH").withCompression(Compression.GZIP));
//process
// write to GCS
....
PipelineResult result = pipeline.run();
result.waitUntilFinish();
# send job completed message to Pubsub to other component
....
....
因为我必须将事件发送到系统中的其他组件。到目前为止,我正在使用Pubsbub Java客户端库将消息推送到pubsub。
有一种方法,我可以使用apache Pubsub连接器发送如下消息-或做同样的正确方法是什么
PubsubIO.writeMessages().to("topicName");
答案
要解决此用例,您可以使用Wait API。可以找到详细信息here
PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
data.apply(Wait.on(firstWriteResults))
// Windows of this intermediate PCollection will be processed no earlier than when
// the respective window of firstWriteResults closes.
.apply(ParDo.of(...write to second database...));
以上是关于如何在不运行管道的情况下使用Apache Beam连接器的主要内容,如果未能解决你的问题,请参考以下文章
在 Dataflow 上运行的 Apache Beam 管道无法从 KafkaIO 读取:SSL 握手失败
如何组合两个结果并将其传递到 apache-beam 管道中的下一步
Apache Beam 数据流:从 Azure 到 GCS 的文件传输