所有作业完成后自动关闭 Google Dataproc 集群
Posted
技术标签:
【中文标题】所有作业完成后自动关闭 Google Dataproc 集群【英文标题】:Automatically shutdown Google Dataproc cluster after all jobs are completed 【发布时间】:2017-10-06 03:02:24 【问题描述】:如何在所有作业完成后以编程方式自动关闭 Google Dataproc 集群?
Dataproc provides 创建、监控和管理。但似乎我不知道如何删除集群。
【问题讨论】:
【参考方案1】:gcloud dataproc
CLI 界面提供了max-idle
选项。
这会在 x 数量的不活动(即没有正在运行的作业)后自动终止 Dataproc 集群。可以这样使用:
gcloud dataproc clusters create test-cluster \
--project my-test-project \
--zone europe-west1-b \
--master-machine-type n1-standard-4 \
--master-boot-disk-size 100 \
--num-workers 2 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 100 \
--max-idle 1h
【讨论】:
这个功能已经 GA 并且在gcloud dataproc ...
(没有beta
)命令中可用。
嗨 Martijn - 感谢您的回答,此命令将暂时停止集群还是将其永久删除?
在达到 max-idle 超时后会杀死整个集群。【参考方案2】:
这取决于语言。就个人而言,我使用 Python (pyspark),这里提供的代码对我来说效果很好:
https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/dataproc/submit_job_to_cluster.py
您可能需要根据您的目的调整代码并遵循 README 文件 (https://github.com/GoogleCloudPlatform/python-docs-samples/tree/master/dataproc) 中指定的先决条件步骤,例如启用 API 并在 requirements.txt
中安装软件包。
基本上,使用函数 wait_for_job
可以等到作业完成,而使用 delete_cluster
,顾名思义,您之前创建的集群会被删除。
我希望这可以帮助你。
【讨论】:
【参考方案3】:要实现这一目标,您有三个选择:
在集群创建期间设置--max-idle
属性(参见https://***.com/a/54239034/3227693)。
使用Dataproc Workflow Templates 管理集群生命周期。它可以自动创建集群来执行作业,并在所有作业完成后删除集群。
使用成熟的编排解决方案 Cloud Composer 来管理您的集群和作业生命周期。
【讨论】:
【参考方案4】:有几种可编程的方式可以自动关闭集群:
-
直接拨打REST api
Use the
gcloud
CLI
在您的作业完成执行后,可以使用(调用)其中任何一个。
在此处查看更多信息: https://cloud.google.com/dataproc/docs/guides/manage-cluster#delete_a_cluster
【讨论】:
我想完全自动化这个任务。那么在这里我们将如何在作业完成执行时收到通知?一旦我们收到作业完成回调/通知,就可以使用 REST api 删除集群。 再次,使用 REST api。具体来说,作业资源上的GET
并将其包装在轮询循环中 - cloud.google.com/dataproc/docs/reference/rest/v1/…。所以,提交 > 监控 > 关机
好的。因此外部脚本必须轮询作业状态,然后根据状态在集群上触发不同的操作。任何管理具有自动关闭和扩展功能的 DataProc 集群的工具/第三方软件?因为自动缩放也存在同样的问题。 DataFlow 自己处理自动缩放。
我不知道任何第三方工具。您需要自己手动滚动一些东西。
我们是否可以使用 REST api 监控集群运行状况并扩大/缩小规模?【参考方案5】:
您可以在 spark 应用程序完成后删除集群。这里有一些例子:
private SparkApplication(String[] args) throws
org.apache.commons.cli.ParseException,
IOException,
InterruptedException
// Your spark code here
if (profile != null && profile.equals("gcp"))
DataProcUtil.deleteCluster(clusterName);
这里是你如何通过 java 删除你的集群
public static void deleteCluster(String clusterName) throws IOException, InterruptedException
logger.info("Try to delete cluster: ....", clusterName);
Process process = new ProcessBuilder("gcloud",
"dataproc",
"clusters",
"delete",
clusterName,
"--async",
"--quiet").start();
int errCode = process.waitFor();
boolean hasError = (errCode == 0 ? false : true);
logger.info("Command executed, any errors? ", hasError);
String output;
if (hasError)
output = output(process.getErrorStream());
else
output = output(process.getInputStream());
logger.info("Output: ", output);
private static String output(InputStream inputStream) throws IOException
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)))
String line;
while ((line = br.readLine()) != null)
sb.append(line)
.append(System.getProperty("line.separator"));
return sb.toString();
【讨论】:
【参考方案6】:您可以使用 Scala 代码做到这一点:
创建集群 运行所有作业 作业终止时删除集群为此,您可以使用 Scala Future。
如果您有许多作业,您可以并行运行它们:
val gcpJarBucket = "gs://test_dataproc/dataproc/Dataproc.jar"
val jobs = Seq("package.class1", "package.class2")
val projectName: String = "automat-dataproc"
val clusterName: String = "your-cluster-name"
val timeout = 180 minute
// Working directory
implicit val wd = pwd
val future = Future
println("Creating the spark cluster...")
% gcloud ("dataproc", "clusters", "create", clusterName, "--subnet", "default", "--zone", "europe-west1-b", "--master-machine-type", "n1-standard-4", "--master-boot-disk-size", "50", "--num-workers", "3", "--worker-machine-type", "n1-standard-4", "--worker-boot-disk-size", "50", "--project", projectName)
println("Creating the spark cluster...DONE")
.flatMap _ =>
Future.sequence
jobs.map jobClass =>
Future
println(s"Launching the spark job from the class $jobClass...")
% gcloud ("dataproc", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$gcpJarBucket")
println(s"Launching the spark job from the class $jobClass...DONE")
Try Await.ready(future, timeout) .recover case exp => println(exp)
% bash ("-c", s"printf 'Y\n' | gcloud dataproc clusters delete $clusterName")
【讨论】:
以上是关于所有作业完成后自动关闭 Google Dataproc 集群的主要内容,如果未能解决你的问题,请参考以下文章