在 AWS EMR 中使用 spark-submit 启动 Python 应用程序
Posted
技术标签:
【中文标题】在 AWS EMR 中使用 spark-submit 启动 Python 应用程序【英文标题】:launch Python app with spark-submit in AWS EMR 【发布时间】:2018-01-24 16:19:52 【问题描述】:我是 Spark 的新手,无法复制 EMR 文档中的 example 以通过 AWS CLI 使用 spark-submit
提交基本用户应用程序。它似乎运行没有错误,但没有产生任何输出。在以下工作流程中,我的 add-steps 语法有问题吗?
示例脚本
目标是在 S3 中对文档中的单词进行计数,例如 lorem-ipsum 的 1000 个单词:
$ aws s3 cp s3://projects/wordcount/input/some_document.txt - | head -n1
Lorem ipsum dolor sit amet, consectetur adipiscing [... etc.]
从文档中复制,python 脚本如下所示:
$ aws s3 cp s3://projects/wordcount/wordcount.py -
from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: wordcount ", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="WordCount")
text_file = sc.textFile(sys.argv[1])
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(sys.argv[2])
sc.stop()
目标文件夹(当前为空):
$ aws s3 ls s3://projects/wordcount/output
PRE output/
创建集群
在doc 工作,我有一个正在运行的带有日志记录的集群,由以下人员生成:
aws emr create-cluster --name TestSparkCluster \
--release-label emr-5.11.0 --applications Name=Spark \
--enable-debugging --log-uri s3://projects/wordcount/log \
--instance-type m3.xlarge --instance-count 3 --use-default-roles
返回消息显示创建"ClusterID": "j-XXXXXXXXXXXXX"
添加步骤
直接看example,我提交add-steps
为:
aws emr add-steps --cluster-id j-XXXXXXXXXXXXX \
--steps Type=spark,Name=SparkWordCountApp,\
Args=[--deploy-mode,cluster,--master,yarn,\
--conf,spark.yarn.submit.waitAppCompletion=false,\
--num-executors,2,--executor-cores,2,--executor-memory,1g,\
s3://projects/wordcount/wordcount.py,\
s3://projects/wordcount/input/some_document.txt,\
s3://projects/wordcount/output/],\
ActionOnFailure=CONTINUE
启动 "StepIds":["s-YYYYYYYYYYY"]
问题
输出文件夹是空的——为什么?
我验证 ID 为 s-YYYYYYYYYYY
的步骤 SparkWordCountApp
在 EMR 控制台中具有 Status:Completed
。
在控制台中,我检查了控制器日志文件和标准错误输出(如下),以验证该步骤是否以退出状态 0 完成。
在Spark documentation 中,使用了稍微不同的语法。它没有将脚本名称作为参数列表的第一个位置,而是说:
对于 Python 应用程序,只需传递一个 .py 文件来代替 而不是 JAR,并添加 Python .zip、.egg 或 .py 使用 --py-files 将文件添加到搜索路径。
但是,该示例使用 sys.argv
,其中 sys.argv[0] 为 wordcount.py
附加信息:日志
控制器日志文件:
2018-01-24T15:54:05.945Z INFO Ensure step 3 jar file command-runner.jar
2018-01-24T15:54:05.945Z INFO StepRunner: Created Runner for step 3
INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --master yarn --conf spark.yarn.submit.waitAppCompletion=false --num-executors 2 --executor-cores 2 --executor-memory 1g s3://projects/wordcount/wordcount.py s3://projects/wordcount/input/some_document.txt s3://projects/wordcount/output/'
INFO Environment:
PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
LESS_TERMCAP_md=[01;38;5;208m
LESS_TERMCAP_me=[0m
HISTCONTROL=ignoredups
LESS_TERMCAP_mb=[01;31m
AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
UPSTART_JOB=rc
LESS_TERMCAP_se=[0m
HISTSIZE=1000
HADOOP_ROOT_LOGGER=INFO,DRFA
JAVA_HOME=/etc/alternatives/jre
AWS_DEFAULT_REGION=us-west-2
AWS_ELB_HOME=/opt/aws/apitools/elb
LESS_TERMCAP_us=[04;38;5;111m
EC2_HOME=/opt/aws/apitools/ec2
TERM=linux
XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
runlevel=3
LANG=en_US.UTF-8
AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
MAIL=/var/spool/mail/hadoop
LESS_TERMCAP_ue=[0m
LOGNAME=hadoop
PWD=/
LANGSH_SOURCED=1
HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-29XVS3IGMSK1/tmp
_=/etc/alternatives/jre/bin/java
CONSOLETYPE=serial
RUNLEVEL=3
LESSOPEN=||/usr/bin/lesspipe.sh %s
previous=N
UPSTART_EVENTS=runlevel
AWS_PATH=/opt/aws
USER=hadoop
UPSTART_INSTANCE=
PREVLEVEL=N
HADOOP_LOGFILE=syslog
PYTHON_INSTALL_LAYOUT=amzn
HOSTNAME=ip-172-31-12-232
NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-29XVS3IGMSK1
EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
SHLVL=5
HOME=/home/hadoop
HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-29XVS3IGMSK1/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-29XVS3IGMSK1/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-29XVS3IGMSK1
INFO ProcessRunner started child process 20797 :
hadoop 20797 3347 0 15:54 ? 00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --master yarn --conf spark.yarn.submit.waitAppCompletion=false --num-executors 2 --executor-cores 2 --executor-memory 1g s3://projects/wordcount/wordcount.py s3://projects/wordcount/input/some_document.txt s3://projects/wordcount/output/
2018-01-24T15:54:09.956Z INFO HadoopJarStepRunner.Runner: startRun() called for s-29XVS3IGMSK1 Child Pid: 20797
INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
INFO waitProcessCompletion ended with exit code 0 : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
INFO total process run time: 16 seconds
2018-01-24T15:54:24.072Z INFO Step created jobs:
2018-01-24T15:54:24.072Z INFO Step succeeded with exitCode 0 and took 16 seconds
Stderr 日志文件:
18/01/24 15:54:12 INFO RMProxy: Connecting to ResourceManager at ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal/XXX.YY.YY.ZZZ:8032
18/01/24 15:54:12 INFO Client: Requesting a new application from cluster with 2 NodeManagers
18/01/24 15:54:12 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (11520 MB per container)
18/01/24 15:54:12 INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
18/01/24 15:54:12 INFO Client: Setting up container launch context for our AM
18/01/24 15:54:12 INFO Client: Setting up the launch environment for our AM container
18/01/24 15:54:12 INFO Client: Preparing resources for our AM container
18/01/24 15:54:14 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
18/01/24 15:54:18 INFO Client: Uploading resource file:/mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7/__spark_libs__8429498492477236801.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/__spark_libs__8429498492477236801.zip
18/01/24 15:54:22 INFO Client: Uploading resource s3://projects/wordcount/wordcount.py -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/wordcount.py
18/01/24 15:54:22 INFO S3NativeFileSystem: Opening 's3://projects/wordcount/wordcount.py' for reading
18/01/24 15:54:22 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/pyspark.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/pyspark.zip
18/01/24 15:54:22 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/py4j-0.10.4-src.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/py4j-0.10.4-src.zip
18/01/24 15:54:22 INFO Client: Uploading resource file:/mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7/__spark_conf__8267377904454396581.zip -> hdfs://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1516806627838_0002/__spark_conf__.zip
18/01/24 15:54:22 INFO SecurityManager: Changing view acls to: hadoop
18/01/24 15:54:22 INFO SecurityManager: Changing modify acls to: hadoop
18/01/24 15:54:22 INFO SecurityManager: Changing view acls groups to:
18/01/24 15:54:22 INFO SecurityManager: Changing modify acls groups to:
18/01/24 15:54:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); groups with view permissions: Set(); users with modify permissions: Set(hadoop); groups with modify permissions: Set()
18/01/24 15:54:22 INFO Client: Submitting application application_1516806627838_0002 to ResourceManager
18/01/24 15:54:23 INFO YarnClientImpl: Submitted application application_1516806627838_0002
18/01/24 15:54:23 INFO Client: Application report for application_1516806627838_0002 (state: ACCEPTED)
18/01/24 15:54:23 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1516809262990
final status: UNDEFINED
tracking URL: http://ip-XXX-YY-YY-ZZZ.us-west-2.compute.internal:20888/proxy/application_1516806627838_0002/
user: hadoop
18/01/24 15:54:23 INFO ShutdownHookManager: Shutdown hook called
18/01/24 15:54:23 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-89654b91-c4db-4847-aa4b-22f27240daf7
Command exiting with ret '0'
【问题讨论】:
【参考方案1】:原来问题是由已经存在的目标文件夹引起的(即使是空的)。删除输出文件夹使示例工作。
我通过阅读 S3 中的 step 日志而不是 EMR 控制台中的 instance 日志发现了这一点——在这些日志中我看到了一个 org.apache.hadoop.mapred.FileAlreadyExistsException
线索我进来了。
预先存在的 S3 文件夹对于我完成的其他写作任务(例如,PigStorage
)来说不是问题,所以我没想到会这样。
我将把这个问题留在(不太可能)其他人遇到这个问题的事件中。
【讨论】:
相关:***.com/questions/27033823/…【参考方案2】:以下添加步骤对我有用,从主节点运行:
aws emr add-steps --cluster-id yourclusterid --steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn, --conf,spark.yarn.submit.waitAppCompletion=false,--num-executors,2,--executor-cores,2,--executor-memory,1g,s3://yourbucketname/yourcode.py,s3: //yourbucketname/yourinputfile.txt,s3://yourbucketname/youroutputfile.out], ActionOnFailure=继续
【讨论】:
以上是关于在 AWS EMR 中使用 spark-submit 启动 Python 应用程序的主要内容,如果未能解决你的问题,请参考以下文章
Matplotlib 使用 AWS-EMR jupyter notebook 绘图