未找到 Pyspark 模块

Posted

技术标签:

【中文标题】未找到 Pyspark 模块【英文标题】:Pyspark module not found 【发布时间】:2015-09-01 16:21:20 【问题描述】:

我正在尝试在 Yarn 中执行一个简单的 Pyspark 作业。这是代码:

from pyspark import SparkConf, SparkContext

conf = (SparkConf()
         .setMaster("yarn-client")
         .setAppName("HDFS Filter")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inputFile = sc.textFile("hdfs://myserver:9000/1436304078054.json.gz").cache()
matchTerm = "spark"
numMatches = inputFile.filter(lambda line: matchTerm in line).count()
print(numMatches, "lines contain", matchTerm)

我不知道代码是否可以工作,这不是重点。问题是,当我从 spark 目录中使用命令 ./bin/pyspark ../job.py 运行它时,我得到下一个错误(只是整个输出的一小部分):

15/09/01 17:57:02 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on hadoop-05:44841 (size: 3.8 KB, free: 534.5 MB)
15/09/01 17:57:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hadoop-05): org.apache.spark.SparkException: 
Error from python worker:
  /usr/bin/python2.7: No module named pyspark
PYTHONPATH was:
  /usr/local/hadoop_store/tmp/nm-local-dir/usercache/hduser/filecache/16/spark-assembly-1.4.1-hadoop2.2.0.jar
java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:86)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:130)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:73)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

15/09/01 17:57:02 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, hadoop-03, RACK_LOCAL, 1475 bytes)
15/09/01 17:57:04 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on hadoop-03:33268 (size: 3.8 KB, free: 534.5 MB)
15/09/01 17:57:05 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1, hadoop-03): org.apache.spark.SparkException: 
Error from python worker:
  /usr/bin/python2.7: No module named pyspark
PYTHONPATH was:
  /usr/local/hadoop_store/tmp/nm-local-dir/usercache/hduser/filecache/21/spark-assembly-1.4.1-hadoop2.2.0.jar
java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:86)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:130)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:73)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

15/09/01 17:57:05 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, hadoop-05, RACK_LOCAL, 1475 bytes)
15/09/01 17:57:05 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on executor hadoop-05: org.apache.spark.SparkException (
Error from python worker:
  /usr/bin/python2.7: No module named pyspark
PYTHONPATH was:
  /usr/local/hadoop_store/tmp/nm-local-dir/usercache/hduser/filecache/16/spark-assembly-1.4.1-hadoop2.2.0.jar
java.io.EOFException) [duplicate 1]
15/09/01 17:57:05 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, hadoop-05, RACK_LOCAL, 1475 bytes)
15/09/01 17:57:05 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on executor hadoop-05: org.apache.spark.SparkException (
Error from python worker:
  /usr/bin/python2.7: No module named pyspark
PYTHONPATH was:
  /usr/local/hadoop_store/tmp/nm-local-dir/usercache/hduser/filecache/16/spark-assembly-1.4.1-hadoop2.2.0.jar
java.io.EOFException) [duplicate 2]
15/09/01 17:57:05 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
15/09/01 17:57:05 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/09/01 17:57:05 INFO cluster.YarnScheduler: Cancelling stage 0
15/09/01 17:57:05 INFO scheduler.DAGScheduler: ResultStage 0 (count at /home/hduser/spark-1.4.1-bin-without-hadoop/../test.py:11) failed in 5.093 s
15/09/01 17:57:05 INFO scheduler.DAGScheduler: Job 0 failed: count at /home/hduser/spark-1.4.1-bin-without-hadoop/../test.py:11, took 5.238381 s
Traceback (most recent call last):
  File "/home/hduser/spark-1.4.1-bin-without-hadoop/../test.py", line 11, in <module>
numMatches = inputFile.filter(lambda line: matchTerm in line).count()
  File "/home/hduser/spark-1.4.1-bin-without-hadoop/python/lib/pyspark.zip/pyspark/rdd.py", line 984, in count
  File "/home/hduser/spark-1.4.1-bin-without-hadoop/python/lib/pyspark.zip/pyspark/rdd.py", line 975, in sum
  File "/home/hduser/spark-1.4.1-bin-without-hadoop/python/lib/pyspark.zip/pyspark/rdd.py", line 852, in fold
  File "/home/hduser/spark-1.4.1-bin-without-hadoop/python/lib/pyspark.zip/pyspark/rdd.py", line 757, in collect
  File "/home/hduser/spark-1.4.1-bin-without-hadoop/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/home/hduser/spark-1.4.1-bin-without-hadoop/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hadoop-05): org.apache.spark.SparkException: 
Error from python worker:
  /usr/bin/python2.7: No module named pyspark
PYTHONPATH was:
  /usr/local/hadoop_store/tmp/nm-local-dir/usercache/hduser/filecache/16/spark-assembly-1.4.1-hadoop2.2.0.jar
java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:86)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:130)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:73)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

15/09/01 17:57:06 INFO spark.SparkContext: Invoking stop() from shutdown hook

最后,这是我的 spark-env.sh conf 文件:

export SPARK_DIST_CLASSPATH=$(hadoop classpath)
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

知道我做错了什么吗?

【问题讨论】:

【参考方案1】:

解决这个问题的原因是在 SparkConf 中添加了一些额外设置,这似乎可以确保工作人员可以访问 PySpark 和 Py4J 模块:

conf = (SparkConf()
     .setMaster("yarn-client")
     .setAppName("HDFS Filter")
     .set("spark.executor.memory", "1g")
     .set('spark.yarn.dist.files','file:/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip,file:/usr/hdp/2.3.2.0-2950/spark/python/lib/py4j-0.8.2.1-src.zip')
     .setExecutorEnv('PYTHONPATH','pyspark.zip:py4j-0.8.2.1-src.zip'))

您需要根据您的系统编辑路径。

【讨论】:

【参考方案2】:

我认为您需要将PYSPARK_PYTHON 环境变量设置为指向您正在使用的任何python 安装。您似乎没有使用 /usr/bin/python2.7 来启动该作业。

我通常在导入和运行 pyspark 之前调用此函数以确保设置正确:

def configure_spark(spark_home=None, pyspark_python=None):
    spark_home = spark_home or "/path/to/default/spark/home"
    os.environ['SPARK_HOME'] = spark_home

    # Add the PySpark directories to the Python path:
    sys.path.insert(1, os.path.join(spark_home, 'python'))
    sys.path.insert(1, os.path.join(spark_home, 'python', 'pyspark'))
    sys.path.insert(1, os.path.join(spark_home, 'python', 'build'))

    # If PySpark isn't specified, use currently running Python binary:
    pyspark_python = pyspark_python or sys.executable
    os.environ['PYSPARK_PYTHON'] = pyspark_python

【讨论】:

以上是关于未找到 Pyspark 模块的主要内容,如果未能解决你的问题,请参考以下文章

pyspark addPyFile 添加 .py 文件的 zip,但仍未找到模块

ValueError:尝试在 pyspark 中的非包中进行相对导入/无法找到 kmodes 模块(pyspark)

Pyspark 轮函数未按预期工作

pyspark中的未嵌套列表

PySpark 将模型预测与未转换的数据对齐:最佳实践

Pyspark UDF - TypeError:“模块”对象不可调用