带有boto3的emr上的pyspark,带有Futures的s3对象结果的副本在[100000毫秒]后超时
Posted
技术标签:
【中文标题】带有boto3的emr上的pyspark,带有Futures的s3对象结果的副本在[100000毫秒]后超时【英文标题】:pyspark on emr with boto3, copy of s3 object result with Futures timed out after [100000 milliseconds] 【发布时间】:2019-10-09 16:41:35 【问题描述】:我有一个 pyspark 应用程序,可以将 csv 转换为 parquet,在此之前我将一些 S3 对象从一个存储桶复制到另一个存储桶。
使用 spark 2.4、emr 5.27、maximizeResourceAllocation 设置为 true 的 pyspark
我有各种 csv 文件大小,从 80kb 到 500mb。
尽管如此,我的 EMR 集群(它不会在本地使用 spark-submit 失败)在 166mb 的文件上完成 70% 时失败(之前的 480mb 成功)。
工作很简单:
def organise_adwords_csv():
s3 = boto3.resource('s3')
bucket = s3.Bucket(S3_ORIGIN_RAW_BUCKET)
for obj in bucket.objects.filter(Prefix=S3_ORIGIN_ADWORDS_RAW + "/"):
key = obj.key
copy_source =
'Bucket': S3_ORIGIN_RAW_BUCKET,
'Key': key
key_tab = obj.key.split("/")
if len(key_tab) < 5:
print("continuing from length", obj)
continue
file_name = ''.join(key_tab[len(key_tab)-1:len(key_tab)])
if file_name == '':
print("continuing", obj)
continue
table = file_name.split("_")[1].replace("-", "_")
new_path = "0/1/2".format(S3_DESTINATION_ORDERED_ADWORDS_RAW_PATH, table, file_name)
print("new_path", new_path) <- the last print will end here
try:
s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)
print("copy done")
except Exception as e:
print(e)
print("an exception occured while copying")
if __name__=='__main__':
organise_adwords_csv()
print("copy Final done") <- never printed
spark = SparkSession.builder.appName("adwords_transform") \
...
但是,在标准输出中,没有显示错误/异常。
在标准错误日志中:
19/10/09 16:16:57 INFO ApplicationMaster: Waiting for spark context initialization...
19/10/09 16:18:37 ERROR ApplicationMaster: Uncaught exception:
java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
19/10/09 16:18:37 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
)
19/10/09 16:18:37 INFO ShutdownHookManager: Shutdown hook called
我完全失明,我不明白什么是失败/为什么。 我怎么能弄清楚呢?在本地它就像一个魅力(但当然超级慢)
编辑: 经过多次尝试,我可以确认该功能:
s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)
使 EMR 集群超时,即使它已经处理了 80% 的文件。 有人对此有什么建议吗?
【问题讨论】:
您能否尝试在您的 EMR 上启动 spark-shell 或 pyspark 并查看它是否正常运行,然后尝试在 shell 上运行脚本。对我来说,您的 spark 配置似乎有问题 我注意到这个命令:s3.meta.client.copy(copy_source, S3_DESTINATION_BUCKET, new_path)
是一个超时。我假设如果要复制的文件太大,火花会超时..?
【参考方案1】:
s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)
这对于任何大于 5 GB 的源对象都将失败。请在 AWS 中使用分段上传。见https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#multipartupload
【讨论】:
我没有任何大于 3GB 的对象 我必须“解决”它的唯一方法是每 60 个文件添加一个 time.sleep(3)。以上是关于带有boto3的emr上的pyspark,带有Futures的s3对象结果的副本在[100000毫秒]后超时的主要内容,如果未能解决你的问题,请参考以下文章
如何使 Pyspark 脚本在 Amazon EMR 上运行以识别 boto3 模块?它说找不到模块
EMR - Pyspark 错误 - 容器以非零退出代码 13 退出。错误文件:prelaunch.err
带有 tez 的 aws emr 上的 Pig 脚本偶尔会因 OutOfMemoryException 而失败