所有作业完成后自动关闭 Google Dataproc 集群

Posted

技术标签:

【中文标题】所有作业完成后自动关闭 Google Dataproc 集群【英文标题】:Automatically shutdown Google Dataproc cluster after all jobs are completed 【发布时间】:2017-10-06 03:02:24 【问题描述】:

如何在所有作业完成后以编程方式自动关闭 Google Dataproc 集群?

Dataproc provides 创建、监控和管理。但似乎我不知道如何删除集群。

【问题讨论】:

【参考方案1】:

gcloud dataproc CLI 界面提供了max-idle 选项。 这会在 x 数量的不活动(即没有正在运行的作业)后自动终止 Dataproc 集群。可以这样使用:

gcloud dataproc clusters create test-cluster \
    --project my-test-project \
    --zone europe-west1-b \
    --master-machine-type n1-standard-4 \
    --master-boot-disk-size 100 \
    --num-workers 2 \
    --worker-machine-type n1-standard-4 \
    --worker-boot-disk-size 100 \
    --max-idle 1h

【讨论】:

这个功能已经 GA 并且在gcloud dataproc ...(没有beta)命令中可用。 嗨 Martijn - 感谢您的回答,此命令将暂时停止集群还是将其永久删除? 在达到 max-idle 超时后会杀死整个集群。【参考方案2】:

这取决于语言。就个人而言,我使用 Python (pyspark),这里提供的代码对我来说效果很好:

https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/dataproc/submit_job_to_cluster.py

您可能需要根据您的目的调整代码并遵循 README 文件 (https://github.com/GoogleCloudPlatform/python-docs-samples/tree/master/dataproc) 中指定的先决条件步骤,例如启用 API 并在 requirements.txt 中安装软件包。

基本上,使用函数 wait_for_job 可以等到作业完成,而使用 delete_cluster ,顾名思义,您之前创建的集群会被删除。 我希望这可以帮助你。

【讨论】:

【参考方案3】:

要实现这一目标,您有三个选择:

    在集群创建期间设置--max-idle 属性(参见https://***.com/a/54239034/3227693)。

    使用Dataproc Workflow Templates 管理集群生命周期。它可以自动创建集群来执行作业,并在所有作业完成后删除集群。

    使用成熟的编排解决方案 Cloud Composer 来管理您的集群和作业生命周期。

【讨论】:

【参考方案4】:

有几种可编程的方式可以自动关闭集群:

    直接拨打REST api Use the gcloud CLI

在您的作业完成执行后,可以使用(调用)其中任何一个。

在此处查看更多信息: https://cloud.google.com/dataproc/docs/guides/manage-cluster#delete_a_cluster

【讨论】:

我想完全自动化这个任务。那么在这里我们将如何在作业完成执行时收到通知?一旦我们收到作业完成回调/通知,就可以使用 REST api 删除集群。 再次,使用 REST api。具体来说,作业资源上的 GET 并将其包装在轮询循环中 - cloud.google.com/dataproc/docs/reference/rest/v1/…。所以,提交 > 监控 > 关机 好的。因此外部脚本必须轮询作业状态,然后根据状态在集群上触发不同的操作。任何管理具有自动关闭和扩展功能的 DataProc 集群的工具/第三方软件?因为自动缩放也存在同样的问题。 DataFlow 自己处理自动缩放。 我不知道任何第三方工具。您需要自己手动滚动一些东西。 我们是否可以使用 REST api 监控集群运行状况并扩大/缩小规模?【参考方案5】:

您可以在 spark 应用程序完成后删除集群。这里有一些例子:

private SparkApplication(String[] args) throws
                                        org.apache.commons.cli.ParseException,
                                        IOException,
                                        InterruptedException 

    // Your spark code here

    if (profile != null && profile.equals("gcp")) 
        DataProcUtil.deleteCluster(clusterName);
    

这里是你如何通过 java 删除你的集群

 public static void deleteCluster(String clusterName) throws IOException, InterruptedException 

    logger.info("Try to delete cluster: ....", clusterName);

    Process process = new ProcessBuilder("gcloud",
                                         "dataproc",
                                         "clusters",
                                         "delete",
                                         clusterName,
                                         "--async",
                                         "--quiet").start();

    int errCode = process.waitFor();
    boolean hasError = (errCode == 0 ? false : true);
    logger.info("Command executed, any errors? ", hasError);
    String output;
    if (hasError) 
        output = output(process.getErrorStream());
    
    else 
        output = output(process.getInputStream());
    

    logger.info("Output: ", output);



private static String output(InputStream inputStream) throws IOException 
    StringBuilder sb = new StringBuilder();

    try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) 

        String line;
        while ((line = br.readLine()) != null) 

            sb.append(line)
              .append(System.getProperty("line.separator"));

        
    
    return sb.toString();


【讨论】:

【参考方案6】:

您可以使用 Scala 代码做到这一点:

创建集群 运行所有作业 作业终止时删除集群

为此,您可以使用 Scala Future。

如果您有许多作业,您可以并行运行它们:

val gcpJarBucket = "gs://test_dataproc/dataproc/Dataproc.jar"
val jobs = Seq("package.class1", "package.class2")
val projectName: String = "automat-dataproc"
val clusterName: String = "your-cluster-name"

val timeout = 180 minute

// Working directory
implicit val wd = pwd

val future = Future 
  println("Creating the spark cluster...")
  % gcloud ("dataproc", "clusters", "create", clusterName, "--subnet", "default", "--zone", "europe-west1-b", "--master-machine-type", "n1-standard-4", "--master-boot-disk-size", "50", "--num-workers", "3", "--worker-machine-type", "n1-standard-4", "--worker-boot-disk-size", "50", "--project", projectName)
  println("Creating the spark cluster...DONE")
.flatMap  _ =>
  
    Future.sequence 
      jobs.map  jobClass =>
        Future 
          println(s"Launching the spark job from the class $jobClass...")
          % gcloud ("dataproc", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$gcpJarBucket")
          println(s"Launching the spark job from the class $jobClass...DONE")
        
      
    

  


Try  Await.ready(future, timeout) .recover  case exp => println(exp) 
% bash ("-c", s"printf 'Y\n' | gcloud dataproc clusters delete $clusterName")

【讨论】:

以上是关于所有作业完成后自动关闭 Google Dataproc 集群的主要内容,如果未能解决你的问题,请参考以下文章

Google Places 自动完成的替代方案 [关闭]

如何关闭Google Chrome自动升级

如何限制 Google Bigquery 中的作业数量 [关闭]

个人作业

当视频从所有屏幕完成时自动关闭 AVPlayer

等待所有多处理作业完成后再继续