Spark 中的任务是啥? Spark worker 是如何执行 jar 文件的?

Posted

技术标签:

【中文标题】Spark 中的任务是啥? Spark worker 是如何执行 jar 文件的?【英文标题】:What is a task in Spark? How does the Spark worker execute the jar file?Spark 中的任务是什么? Spark worker 是如何执行 jar 文件的? 【发布时间】:2014-10-06 05:41:34 【问题描述】:

在阅读了http://spark.apache.org/docs/0.8.0/cluster-overview.html 上的一些文档后,我有一些问题想澄清一下。

以 Spark 为例:

JavaSparkContext spark = new JavaSparkContext(
  new SparkConf().setJars("...").setSparkHome....);
JavaRDD<String> file = spark.textFile("hdfs://...");

// step1
JavaRDD<String> words =
  file.flatMap(new FlatMapFunction<String, String>() 
    public Iterable<String> call(String s) 
      return Arrays.asList(s.split(" "));
    
  );

// step2
JavaPairRDD<String, Integer> pairs =
  words.map(new PairFunction<String, String, Integer>() 
    public Tuple2<String, Integer> call(String s) 
      return new Tuple2<String, Integer>(s, 1);
    
  );

// step3
JavaPairRDD<String, Integer> counts =
  pairs.reduceByKey(new Function2<Integer, Integer>() 
    public Integer call(Integer a, Integer b) 
      return a + b;
    
  );

counts.saveAsTextFile("hdfs://...");

假设我有 3 个节点集群,节点 1 作为主节点运行,并且上面的驱动程序已经正确 jared(比如 application-test.jar)。所以现在我在主节点上运行这段代码,我相信在创建SparkContext 之后,application-test.jar 文件将被复制到工作节点(每个工作人员将为该应用程序创建一个目录) .

所以现在我的问题是: 示例任务中的 step1、step2 和 step3 是否会发送给工作人员?如果是,那么工作人员如何执行该操作?比如java -cp "application-test.jar" step1之类的?

【问题讨论】:

【参考方案1】:

当您创建SparkContext 时,每个工作人员都会启动一个执行程序。这是一个单独的进程 (JVM),它也会加载您的 jar。执行器连接回您的驱动程序。现在驱动程序可以向它们发送命令,例如您的示例中的flatMapmapreduceByKey。当驱动程序退出时,执行程序关闭。

RDD 有点像被分成多个分区的大数组,每个 executor 都可以保存其中的一些分区。

task 是通过序列化您的Function 对象从驱动程序发送到执行程序的命令。执行器反序列化命令(这是可能的,因为它已经加载了您的 jar),并在分区上执行它。

(这是一个概念性的概述。我在掩饰一些细节,但希望对您有所帮助。)


回答您的具体问题:不,每个步骤都不会启动新流程。当SparkContext 被构造时,每个worker 上都会启动一个新进程。

【讨论】:

有一个重要的转折。一切都以懒惰的方式发生。所以rdd.map 在需要之前不会做任何事情。如果您执行rdd.filter(...).map(...).collect(),则filtermap 函数仅在您调用collect 时在worker 上运行。但大多数时候你不需要考虑这个。 所以执行者实际上堆叠了 rdd 转换任务并且不执行它们,除非在 rdd 上调用了一个动作。这就是为什么它被称为弹性?它记录了转换任务,当一些失败时,执行者只是拿起任务并再次运行?谢谢 那么 jar 文件真的被洗牌到工作节点了吗?或者它停留在驱动应用程序的节点,当驱动将任务发送给工作人员时,工作人员将返回驱动程序并将jar作为依赖项来执行命令?那么在整个计算过程中,jar文件只有一份吗?它只停留在驱动节点?谢谢 你用SparkContext.addJar指定的jar会被复制到所有的worker节点。 这是一个如此……美丽的……对火花背后的概念的叙述,它让我几乎要流泪了。也许误导的是 RDD 就像大数组的概念。它们是关于如何实现这些数组以及如何对它们进行分区的指令容器,而不是这些数组本身。【参考方案2】:

要清楚了解如何创建和安排任务,我们必须了解 Spark 中的执行模型是如何工作的。简而言之,spark 中的应用分三步执行:

    创建 RDD 图 根据RDD图创建执行计划。在此步骤中创建阶段 根据计划生成任务并在工作人员之间安排它们

在您的字数统计示例中,RDD 图相当简单,如下所示:

文件 -> 行 -> 单词 -> 每字数 -> 全局字数 -> 输出

基于此图,创建了两个阶段。阶段创建规则基于管道尽可能多的狭窄转换的想法。在您的示例中,窄转换以每个字数结束。因此,你得到两个阶段

    文件 -> 行 -> 单词 -> 每个单词计数 全局字数 -> 输出

一旦找到阶段,Spark 就会从阶段生成任务。第一阶段将创建 ShuffleMapTasks,最后阶段将创建 ResultTasks,因为在最后阶段,包含一个动作操作来产生结果。

要生成的任务数量取决于文件的分布方式。假设您在三个不同的节点中有 3 个三个不同的文件,第一阶段将生成 3 个任务:每个分区一个任务。

因此,您不应将步骤直接映射到任务。一个task属于一个stage,并且和一个partition相关。

通常,为一个阶段运行的任务数正是最终 RDD 的分区数,但由于 RDD 可以共享(因此ShuffleMapStages),它们的数量取决于 RDD/阶段共享。请参考How DAG works under the covers in RDD?

【讨论】:

谢谢惠。现在为什么我有近 20 个单阶段任务是有道理的。我的 RDD 跨集群有 20 个分区。一个问题是如何强制每个执行者只处理本地数据。我看到执行程序日志说“storage.BlockManager:远程找到块rdd_2_2”。大多数块似乎是在本地找到的,但有些被标记为远程。当我观察到每个步骤是 NODE_LOCAL 或 PROCESS_LOCAL @nir,我不知道有关执行程序如何工作的所有详细信息,但您所指的可能是数据必须被打乱的情况:从一个节点移动到另一个节点。如果是这样,你不能强制它,因为它确实需要那些远程数据。 我认为 RDD 图是指 DAG 图。

以上是关于Spark 中的任务是啥? Spark worker 是如何执行 jar 文件的?的主要内容,如果未能解决你的问题,请参考以下文章

Spark的启动进程详解

spark work目录处理 And HDFS空间都去哪了?

Spark 的四种模式

spark中的RDD是啥

Spark的“One stack to rule them all”是啥意思?

Spark中的“RDD可以存储在内存中”是啥意思?