toPandas() 在 Jupyter iPython Notebook 上工作,但提交失败 - AWS EMR

Posted

技术标签:

【中文标题】toPandas() 在 Jupyter iPython Notebook 上工作,但提交失败 - AWS EMR【英文标题】:toPandas() work from Jupyter iPython Notebook but fails on submit - AWS EMR 【发布时间】:2016-10-17 09:07:59 【问题描述】:

我有一个程序: 1.读取一些数据 2.执行一些操作 3.保存一个csv文件 4. 将该文件传输到 FTP

我正在使用 Amazon EMR 集群和 PySpark 来完成这项任务。

对于第 4 步,我需要将 CSV 保存在本地存储上,而不是 HDFS 上。为此,我将 Spark 数据帧转换为 Pandas 数据帧。

sn-p 可能是:

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.mllib.evaluation import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from time import time
import timeit
from datetime import datetime, timedelta
import numpy as np
import random as rand
import pandas as pd
from itertools import combinations, permutations
from collections import defaultdict
from ftplib import FTP
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("Recommendation").set('spark.driver.memory', '8G').set('spark.executor.memory', '4G')
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)


readRdd = sqlContext.read.format('com.databricks.spark.csv').load('s3n://my-bucket/myfile' + path)

df = readRdd.toPandas() # <---------- PROBLEM
print('toPandas() completed')

df.to_csv('./myFile')

问题是:

当我在同一个集群上从 Jpyter iPython notebook 运行这段代码时,它就像一个魅力。但是,当我使用 Spark Submit 运行此代码或将其作为一个步骤添加到 EMR 时,代码会在以下行中失败:

df = readRdd.toPandas() 

'toPandas() completed' 永远不会被打印出来

在 spark 作业监视器中,我可以看到 toPandas() 方法被执行,但之后我得到了错误。

16/10/10 13:17:47 INFO YarnAllocator: Driver requested a total number of 1 executor(s).
16/10/10 13:17:47 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:47 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:47 INFO TaskSetManager: Finished task 1462.0 in stage 17.0 (TID 10624) in 2089 ms on ip-172-31-38-70.eu-west-1.compute.internal (1515/1516)
16/10/10 13:17:47 INFO TaskSetManager: Finished task 1485.0 in stage 17.0 (TID 10647) in 2059 ms on ip-172-31-38-70.eu-west-1.compute.internal (1516/1516)
16/10/10 13:17:47 INFO YarnClusterScheduler: Removed TaskSet 17.0, whose tasks have all completed, from pool 
16/10/10 13:17:47 INFO DAGScheduler: ResultStage 17 (toPandas at 20161007_RecPipeline.py:182) finished in 12.609 s
16/10/10 13:17:47 INFO DAGScheduler: Job 4 finished: toPandas at 20161007_RecPipeline.py:182, took 14.646644 s
16/10/10 13:17:47 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
16/10/10 13:17:47 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:47 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:50 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:50 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:53 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:53 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:56 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:56 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:59 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:59 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:02 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:02 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:05 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:05 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:08 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:08 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:11 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:11 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:14 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:14 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:17 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:17 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:20 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:20 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:23 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:23 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:26 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:26 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:29 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:29 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:32 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:32 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:35 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:35 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:36 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/10 13:18:36 INFO SparkContext: Invoking stop() from shutdown hook
16/10/10 13:18:36 INFO SparkUI: Stopped Spark web UI at http://172.31.37.28:45777
16/10/10 13:18:36 INFO YarnClusterSchedulerBackend: Shutting down all executors
16/10/10 13:18:36 INFO YarnClusterSchedulerBackend: Asking each executor to shut down
16/10/10 13:18:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/10/10 13:18:36 ERROR PythonRDD: Error while sending iterator
java.net.SocketException: Connection reset
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:648)
    at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
    at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
16/10/10 13:18:36 ERROR ApplicationMaster: User application exited with status 143
16/10/10 13:18:36 INFO ApplicationMaster: Final app status: FAILED, exitCode: 143, (reason: User application exited with status 143)
16/10/10 13:18:36 INFO MemoryStore: MemoryStore cleared
16/10/10 13:18:36 INFO BlockManager: BlockManager stopped
16/10/10 13:18:36 INFO BlockManagerMaster: BlockManagerMaster stopped
16/10/10 13:18:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/10/10 13:18:36 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/10/10 13:18:36 INFO SparkContext: Successfully stopped SparkContext
16/10/10 13:18:36 INFO ShutdownHookManager: Shutdown hook called
16/10/10 13:18:36 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt3/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-eab43d4e-7201-4bcb-8ee7-0e7b546e8fd8
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1d88398f-ecd5-4d94-a42a-a406b3d566af/pyspark-34bec23c-a686-475d-85c9-9e9228b23239
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1d88398f-ecd5-4d94-a42a-a406b3d566af
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt3/yarn/usercache/hadoop/appcache/application_1476100925559_0002/container_1476100925559_0002_01_000001/tmp/spark-96cdee47-e3f3-45f4-8bc7-0df5928ef53c
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt2/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-f6821ea1-6f37-4cc6-8bba-049ac0215786
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt1/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1827cae8-8a60-4b29-a4e5-368a8e1856fd

我的集群配置如下:

spark-defaults  spark.driver.maxResultSize  8G
spark-defaults  spark.driver.memory 8G
spark-defaults  spark.executor.memory   4G

Spark 提交命令如下所示:

spark-submit --deploy-mode cluster s3://my-bucket/myPython.py

这可要了我的命!有人请给我任何指示我可能会看到什么方向?

【问题讨论】:

【参考方案1】:

这就是问题所在:

spark-submit --deploy-mode cluster s3://my-bucket/myPython.py

在上述命令中,部署模式设置为集群,这意味着将从核心节点中选择一个节点来运行驱动程序。由于允许的驱动程序内存为 8G,并且核心节点是较小的物理实例,因此它们总是会耗尽所需的内存。

解决方案是在客户端模式下部署,驱动程序将始终在主节点上运行(在我的情况下,更大的物理实例具有更多资源)不会耗尽整个进程所需的内存。

因为它是一个专用集群,所以这个解决方案适用于我的情况。

如果部署模式必须是集群的共享集群,使用更大的实例应该可以工作。

【讨论】:

以上是关于toPandas() 在 Jupyter iPython Notebook 上工作,但提交失败 - AWS EMR的主要内容,如果未能解决你的问题,请参考以下文章

将 PySpark 与 Jupyter Notebook 集成

Spark DataFrame 方法 `toPandas` 实际上在做啥?

使用 toPandas() 方法创建的数据框是不是分布在 spark 集群中?

使用 pyspark 的 toPandas() 错误:'int' 对象不可迭代

如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?

Dataframe.toPandas 总是在驱动节点还是工作节点上?