在 AWS EMR 中使用 spark-submit 启动 Python 应用程序

Posted

技术标签:

【中文标题】在 AWS EMR 中使用 spark-submit 启动 Python 应用程序【英文标题】:launch Python app with spark-submit in AWS EMR 【发布时间】:2018-01-24 16:19:52 【问题描述】:

我是 Spark 的新手,无法复制 EMR 文档中的 example 以通过 AWS CLI 使用 spark-submit 提交基本用户应用程序。它似乎运行没有错误,但没有产生任何输出。在以下工作流程中,我的 add-steps 语法有问题吗?

示例脚本

目标是在 S3 中对文档中的单词进行计数,例如 lorem-ipsum 的 1000 个单词:

$ aws s3 cp s3://projects/wordcount/input/some_document.txt - | head -n1
Lorem ipsum dolor sit amet, consectetur adipiscing [... etc.]

从文档中复制,python 脚本如下所示:

$ aws s3 cp s3://projects/wordcount/wordcount.py -
from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: wordcount  ", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="WordCount")
    text_file = sc.textFile(sys.argv[1])
    counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile(sys.argv[2])
    sc.stop()

目标文件夹(当前为空):

$ aws s3 ls s3://projects/wordcount/output
                           PRE output/

创建集群

在doc 工作,我有一个正在运行的带有日志记录的集群,由以下人员生成:

aws emr create-cluster --name TestSparkCluster \
--release-label emr-5.11.0 --applications Name=Spark \
--enable-debugging --log-uri s3://projects/wordcount/log \
--instance-type m3.xlarge --instance-count 3 --use-default-roles

返回消息显示创建"ClusterID": "j-XXXXXXXXXXXXX"

添加步骤

直接看example,我提交add-steps为:

aws emr add-steps --cluster-id j-XXXXXXXXXXXXX \
--steps Type=spark,Name=SparkWordCountApp,\
Args=[--deploy-mode,cluster,--master,yarn,\
--conf,spark.yarn.submit.waitAppCompletion=false,\
--num-executors,2,--executor-cores,2,--executor-memory,1g,\
s3://projects/wordcount/wordcount.py,\
s3://projects/wordcount/input/some_document.txt,\
s3://projects/wordcount/output/],\
ActionOnFailure=CONTINUE

启动 "StepIds":["s-YYYYYYYYYYY"]

问题

输出文件夹是空的——为什么?

我验证 ID 为 s-YYYYYYYYYYY 的步骤 SparkWordCountApp 在 EMR 控制台中具有 Status:Completed

在控制台中,我检查了控制器日志文件和标准错误输出(如下),以验证该步骤是否以退出状态 0 完成。

在Spark documentation 中,使用了稍微不同的语法。它没有将脚本名称作为参数列表的第一个位置,而是说:

对于 Python 应用程序,只需传递一个 .py 文件来代替 而不是 JAR,并添加 Python .zip、.egg 或 .py 使用 --py-files 将文件添加到搜索路径。

但是,该示例使用 sys.argv,其中 sys.argv[0] 为 wordcount.py

附加信息:日志

控制器日志文件:

2018-01-24T15:54:05.945Z INFO Ensure step 3 jar file command-runner.jar
2018-01-24T15:54:05.945Z INFO StepRunner: Created Runner for step 3
INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --master yarn --conf spark.yarn.submit.waitAppCompletion=false --num-executors 2 --executor-cores 2 --executor-memory 1g s3://projects/wordcount/wordcount.py s3://projects/wordcount/input/some_document.txt s3://projects/wordcount/output/'
INFO Environment:
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  LESS_TERMCAP_md=[01;38;5;208m
  LESS_TERMCAP_me=[0m
  HISTCONTROL=ignoredups
  LESS_TERMCAP_mb=[01;31m
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_JOB=rc
  LESS_TERMCAP_se=[0m
  HISTSIZE=1000
  HADOOP_ROOT_LOGGER=INFO,DRFA
  JAVA_HOME=/etc/alternatives/jre
  AWS_DEFAULT_REGION=us-west-2
  AWS_ELB_HOME=/opt/aws/apitools/elb
  LESS_TERMCAP_us=[04;38;5;111m
  EC2_HOME=/opt/aws/apitools/ec2
  TERM=linux
  XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
  runlevel=3
  LANG=en_US.UTF-8
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  MAIL=/var/spool/mail/hadoop
  LESS_TERMCAP_ue=[0m
  LOGNAME=hadoop
  PWD=/
  LANGSH_SOURCED=1
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-29XVS3IGMSK1/tmp
  _=/etc/alternatives/jre/bin/java
  CONSOLETYPE=serial
  RUNLEVEL=3
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  previous=N
  UPSTART_EVENTS=runlevel
  AWS_PATH=/opt/aws
  USER=hadoop
  UPSTART_INSTANCE=
  PREVLEVEL=N
  HADOOP_LOGFILE=syslog
  PYTHON_INSTALL_LAYOUT=amzn
  HOSTNAME=ip-172-31-12-232
  NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-29XVS3IGMSK1
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  SHLVL=5
  HOME=/home/hadoop
  HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-29XVS3IGMSK1/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-29XVS3IGMSK1/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-29XVS3IGMSK1
INFO ProcessRunner started child process 20797 :
hadoop   20797  3347  0 15:54 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --master yarn --conf spark.yarn.submit.waitAppCompletion=false --num-executors 2 --executor-cores 2 --executor-memory 1g s3://projects/wordcount/wordcount.py s3://projects/wordcount/input/some_document.txt s3://projects/wordcount/output/
2018-01-24T15:54:09.956Z INFO HadoopJarStepRunner.Runner: startRun() called for s-29XVS3IGMSK1 Child Pid: 20797
INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
INFO waitProcessCompletion ended with exit code 0 : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
INFO total process run time: 16 seconds
2018-01-24T15:54:24.072Z INFO Step created jobs: 
2018-01-24T15:54:24.072Z INFO Step succeeded with exitCode 0 and took 16 seconds

Stderr 日志文件:

18/01/24 15:54:12 INFO RMProxy: Connecting to ResourceManager at ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal/XXX.YY.YY.ZZZ:8032
18/01/24 15:54:12 INFO Client: Requesting a new application from cluster with 2 NodeManagers
18/01/24 15:54:12 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (11520 MB per container)
18/01/24 15:54:12 INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
18/01/24 15:54:12 INFO Client: Setting up container launch context for our AM
18/01/24 15:54:12 INFO Client: Setting up the launch environment for our AM container
18/01/24 15:54:12 INFO Client: Preparing resources for our AM container
18/01/24 15:54:14 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
18/01/24 15:54:18 INFO Client: Uploading resource file:/mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7/__spark_libs__8429498492477236801.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/__spark_libs__8429498492477236801.zip
18/01/24 15:54:22 INFO Client: Uploading resource s3://projects/wordcount/wordcount.py -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/wordcount.py
18/01/24 15:54:22 INFO S3NativeFileSystem: Opening 's3://projects/wordcount/wordcount.py' for reading
18/01/24 15:54:22 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/pyspark.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/pyspark.zip
18/01/24 15:54:22 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/py4j-0.10.4-src.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/py4j-0.10.4-src.zip
18/01/24 15:54:22 INFO Client: Uploading resource file:/mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7/__spark_conf__8267377904454396581.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/__spark_conf__.zip
18/01/24 15:54:22 INFO SecurityManager: Changing view acls to: hadoop
18/01/24 15:54:22 INFO SecurityManager: Changing modify acls to: hadoop
18/01/24 15:54:22 INFO SecurityManager: Changing view acls groups to: 
18/01/24 15:54:22 INFO SecurityManager: Changing modify acls groups to: 
18/01/24 15:54:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
18/01/24 15:54:22 INFO Client: Submitting application application_1516806627838_0002 to ResourceManager
18/01/24 15:54:23 INFO YarnClientImpl: Submitted application application_1516806627838_0002
18/01/24 15:54:23 INFO Client: Application report for application_1516806627838_0002 (state: ACCEPTED)
18/01/24 15:54:23 INFO Client: 
   client token: N/A
   diagnostics: N/A
   ApplicationMaster host: N/A
   ApplicationMaster RPC port: -1
   queue: default
   start time: 1516809262990
   final status: UNDEFINED
   tracking URL: http://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:20888/proxy/application_1516806627838_0002/
   user: hadoop
18/01/24 15:54:23 INFO ShutdownHookManager: Shutdown hook called
18/01/24 15:54:23 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7
Command exiting with ret '0'

【问题讨论】:

【参考方案1】:

原来问题是由已经存在的目标文件夹引起的(即使是空的)。删除输出文件夹使示例工作。

我通过阅读 S3 中的 step 日志而不是 EMR 控制台中的 instance 日志发现了这一点——在这些日志中我看到了一个 org.apache.hadoop.mapred.FileAlreadyExistsException 线索我进来了。

预先存在的 S3 文件夹对于我完成的其他写作任务(例如,PigStorage)来说不是问题,所以我没想到会这样。

我将把这个问题留在(不太可能)其他人遇到这个问题的事件中。

【讨论】:

相关:***.com/questions/27033823/…【参考方案2】:

以下添加步骤对我有用,从主节点运行:

aws emr add-steps --cluster-id yourclusterid --steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn, --conf,spark.yarn.submit.waitAppCompletion=false,--num-executors,2,--executor-cores,2,--executor-memory,1g,s3://yourbucketname/yourcode.py,s3: //yourbucketname/yourinputfile.txt,s3://yourbucketname/youroutputfile.out], ActionOnFailure=继续

【讨论】:

以上是关于在 AWS EMR 中使用 spark-submit 启动 Python 应用程序的主要内容,如果未能解决你的问题,请参考以下文章

Matplotlib 使用 AWS-EMR jupyter notebook 绘图

Apache Hudi在AWS Glue和AWS EMR上同步元数据的异同

AWS EMR 上的持续集成

在插入语句中使用语句 HIVE EMR AWS

使用 AWS EMR 的 ETL

AWS Data Pipeline 在 emr 活动步骤部分中转义逗号