如何在谷歌云数据流中停止流式传输管道

Posted

技术标签:

【中文标题】如何在谷歌云数据流中停止流式传输管道【英文标题】:How to stop a streaming pipeline in google cloud dataflow 【发布时间】:2015-08-31 19:32:16 【问题描述】:

我有一个流式数据流正在运行以读取 PUB/SUB 订阅。

经过一段时间或可能在处理一定数量的数据后,我希望管道自行停止。我不希望我的计算引擎实例无限期地运行。

当我通过数据流控制台取消作业时,它显示为失败的作业。

有没有办法做到这一点?我错过了什么吗?或者 API 中缺少该功能。

【问题讨论】:

听起来你不应该在流模式下运行,而是在批处理模式下运行。您需要在流模式下运行的用例是什么? 我必须进入流媒体模式,因为我的输入是通过 PUB/SUB。由于流媒体作业一直在运行,我想停止它 听起来很奇怪,您选择使用 pub/sub 和流式运行程序来设计您的应用程序,当您希望它在处理 X 数量的数据后停止。听起来像经典批次。无论如何,我在 API/SDK 中看不到任何当前取消作业的内容。您可以停止/删除管道工作池中的虚拟机。那时它可能会失败/取消。这样能行吗? 我们已经在考虑添加 pub/sub 源的变体以用于批处理模式,类似于 Bharathi 的建议(“读取一段时间”或“读取一定量的数据” ) - 这是一个有效的用例,非常符合 Dataflow 统一流和批处理的想法。 【参考方案1】:

你能做这样的事吗?

Pipeline pipeline = ...;
... (construct the streaming pipeline) ...
final DataflowPipelineJob job =
    DataflowPipelineRunner.fromOptions(pipelineOptions)
                          .run(pipeline);
Thread.sleep(your timeout);
job.cancel();

【讨论】:

啊,你就是这样取消的。我天真地试图在 Pipeline 类中找到 cancel()。很高兴知道。 你能告诉我 Pipeline.run() 和 DataflowPipelineRunner.fromOptions(pipelineOptions) .run(pipeline); 之间的区别吗?它们之间有什么区别吗? 数据流管道可以与不同的运行器一起运行,使用 Pipeline.setRunner - 例如使用 DirectPipelineRunner、[Blocking]DataflowPipelineRunner,目前在 Spark 和 Flink 上存在运行器。不同的跑步者提供不同的能力。如果您只想运行管道,请调用 pipeline.run()。如果您想要特定于运行器的功能(例如 DataflowPipelineRunner 可以取消管道),请直接配置/调用运行器,如本例所示。 新的 SDK 2.x 无法使用此解决方案,因为 API 已更改。除此之外,在此 SDK 版本中,Pub/Sub 源似乎必须使用流模式。【参考方案2】:

我能够使用 Rest API 在数据流上耗尽(取消作业而不丢失数据)正在运行的流式作业。

See my answer

使用 Rest Update 方法,与此主体:

"requestedState": "JOB_STATE_DRAINING"

【讨论】:

以上是关于如何在谷歌云数据流中停止流式传输管道的主要内容,如果未能解决你的问题,请参考以下文章

Bigquery 如何使用存储在谷歌云中的数据?

Python:在谷歌云数据存储模拟器中保存数据

如何在谷歌云存储中启用实时对象访问分析?

当 AMQ 主题中没有数据可读取时如何停止流式传输

如何在谷歌大查询中从谷歌云存储上传表格

在谷歌应用引擎中将数据流式传输到 bigquery - java