向 dataproc 集群提交 pyspark 作业时出错(找不到作业)
Posted
技术标签:
【中文标题】向 dataproc 集群提交 pyspark 作业时出错(找不到作业)【英文标题】:Error when submitting a pyspark job to dataproc cluster (job not found) 【发布时间】:2017-09-20 17:21:17 【问题描述】:我有一个基于 GCP 的 python 客户端库的脚本,用于配置集群并向它们提交作业。当我运行脚本时,它成功地将文件上传到谷歌存储,创建集群并提交作业。当它运行我的“wait_for_job()”函数时会出现错误,如下所示:
Waiting for job to finish...
Traceback (most recent call last):
File "/Users/cdastmalchi/WGS_automation_python_SDK.py", line
174, in <module> main()
File "/Users/cdastmalchi/WGS_automation_python_SDK.py", line
168, in main wait_for_job(dataproc, args.project_id,
region, args.cluster_name)
File "/Users/cdastmalchi/WGS_automation_python_SDK.py", line
132, in wait_for_job
jobId=job_id).execute()
File "/anaconda/lib/python2.7/site-
packages/oauth2client/util.py", line 137, in
positional_wrapper
return wrapped(*args, **kwargs)
File "/anaconda/lib/python2.7/site-
packages/googleapiclient/http.py", line 842, in execute
raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 404 when requesting
https://dataproc.googleapis.com/v1/projects/my-
project/regions/us-east4/jobs/my-cluster?alt=json returned "Job
not found my-project/my-cluster">
这是我的wait_for_job()
函数:
def wait_for_job(dataproc, project, region, job_id):
print('Waiting for job to finish...')
while True:
result = dataproc.projects().regions().jobs().get(
projectId=project,
region=region,
jobId=job_id).execute()
# Handle exceptions
if result['status']['state'] == 'ERROR':
raise Exception(result['status']['details'])
elif result['status']['state'] == 'DONE':
print('Job finished.')
return result
这是我的create_cluster()
函数:
def create_cluster(dataproc, project, zone, region, cluster_name, master_type, worker_type):
print('Creating cluster...')
zone_uri = \
'https://www.googleapis.com/compute/v1/projects//zones/'.format(
project, zone)
cluster_data =
'projectId': project,
'clusterName': cluster_name,
'config':
'gceClusterConfig':
'zoneUri': zone_uri,
,
'masterConfig':
'machineTypeUri' : master_type,
,
'workerConfig':
'machineTypeUri' : worker_type,
,
result = dataproc.projects().regions().clusters().create(
projectId=project,
region=region,
body=cluster_data).execute()
return result
您认为问题与地区/专区有关吗?我的集群位于us-east4-b
,尝试提交的作业位于us-east4
。
【问题讨论】:
'jobs/my-cluster' 看起来很可疑。您确定您的工作 ID 是“我的集群”吗? @tix:是的,那部分看起来很可疑。我的工作 ID 不是“我的集群”。 “我的集群”是集群名称。我将包含create_cluster()
函数,以防有助于发现问题。
我会追踪 wait_for_job() 是如何被调用的,特别是“job_id”值的来源(可能记录所有参数)
【参考方案1】:
您的错误消息显示您的代码正在将args.cluster_name
传递给wait_for_job
,而wait_for_job
的方法签名在最后一个参数中需要一个jobid,而不是cluster_name:
File "/Users/cdastmalchi/WGS_automation_python_SDK.py", line
168, in main wait_for_job(dataproc, args.project_id,
region, args.cluster_name)
您需要将该参数更改为您的 jobid。
【讨论】:
谢谢@DennisHuo!以上是关于向 dataproc 集群提交 pyspark 作业时出错(找不到作业)的主要内容,如果未能解决你的问题,请参考以下文章
Google Cloud Dataproc 上的 Pyspark 作业失败
在 google-dataproc 的 Spark 集群中的 pyspark 作业中使用外部库
GCP Dataproc 节点中没有资源来启动新的 SparkSession
Dataproc 上的 PySpark 因 SocketTimeoutException 而停止