带有boto3的emr上的pyspark,带有Futures的s3对象结果的副本在[100000毫秒]后超时

Posted

技术标签:

【中文标题】带有boto3的emr上的pyspark,带有Futures的s3对象结果的副本在[100000毫秒]后超时【英文标题】:pyspark on emr with boto3, copy of s3 object result with Futures timed out after [100000 milliseconds] 【发布时间】:2019-10-09 16:41:35 【问题描述】:

我有一个 pyspark 应用程序,可以将 csv 转换为 parquet,在此之前我将一些 S3 对象从一个存储桶复制到另一个存储桶。

使用 spark 2.4、emr 5.27、maximizeResourceAllocation 设置为 true 的 pyspark

我有各种 csv 文件大小,从 80kb 到 500mb。

尽管如此,我的 EMR 集群(它不会在本地使用 spark-submit 失败)在 166mb 的文件上完成 70% 时失败(之前的 480mb 成功)。

工作很简单:

def organise_adwords_csv():
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(S3_ORIGIN_RAW_BUCKET)
    for obj in bucket.objects.filter(Prefix=S3_ORIGIN_ADWORDS_RAW + "/"):
        key = obj.key

        copy_source = 
            'Bucket': S3_ORIGIN_RAW_BUCKET,
            'Key': key
        

        key_tab = obj.key.split("/")
        if len(key_tab) < 5:
            print("continuing from length", obj)
            continue
        file_name = ''.join(key_tab[len(key_tab)-1:len(key_tab)])
        if file_name == '':
            print("continuing", obj)
            continue

        table = file_name.split("_")[1].replace("-", "_")
        new_path = "0/1/2".format(S3_DESTINATION_ORDERED_ADWORDS_RAW_PATH, table, file_name)
        print("new_path", new_path) <- the last print will end here
        try:
            s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)
            print("copy done")
        except Exception as e:
            print(e)
            print("an exception occured while copying")

if __name__=='__main__':
    organise_adwords_csv()

    print("copy Final done") <- never printed

    spark = SparkSession.builder.appName("adwords_transform") \
    ...

但是,在标准输出中,没有显示错误/异常。

在标准错误日志中:

19/10/09 16:16:57 INFO ApplicationMaster: Waiting for spark context initialization...
19/10/09 16:18:37 ERROR ApplicationMaster: Uncaught exception: 
java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
19/10/09 16:18:37 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
)
19/10/09 16:18:37 INFO ShutdownHookManager: Shutdown hook called

我完全失明,我不明白什么是失败/为什么。 我怎么能弄清楚呢?在本地它就像一个魅力(但当然超级慢)

编辑: 经过多次尝试,我可以确认该功能:

s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)

使 EMR 集群超时,即使它已经处理了 80% 的文件。 有人对此有什么建议吗?

【问题讨论】:

您能否尝试在您的 EMR 上启动 spark-shell 或 pyspark 并查看它是否正常运行,然后尝试在 shell 上运行脚本。对我来说,您的 spark 配置似乎有问题 我注意到这个命令:s3.meta.client.copy(copy_source, S3_DESTINATION_BUCKET, new_path) 是一个超时。我假设如果要复制的文件太大,火花会超时..? 【参考方案1】:
s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)

这对于任何大于 5 GB 的源对象都将失败。请在 AWS 中使用分段上传。见https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#multipartupload

【讨论】:

我没有任何大于 3GB 的对象 我必须“解决”它的唯一方法是每 60 个文件添加一个 time.sleep(3)。

以上是关于带有boto3的emr上的pyspark,带有Futures的s3对象结果的副本在[100000毫秒]后超时的主要内容,如果未能解决你的问题,请参考以下文章

如何使 Pyspark 脚本在 Amazon EMR 上运行以识别 boto3 模块?它说找不到模块

如何在 EMR zeppelin 上安装 boto3

EMR - Pyspark 错误 - 容器以非零退出代码 13 退出。错误文件:prelaunch.err

带有 tez 的 aws emr 上的 Pig 脚本偶尔会因 OutOfMemoryException 而失败

我无法在 EMR PySpark 笔记本中安装 spacy 模型

pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()