调用地图后的pyspark EOFError
Posted
技术标签:
【中文标题】调用地图后的pyspark EOFError【英文标题】:pyspark EOFError after calling map 【发布时间】:2016-04-13 08:26:38 【问题描述】:我是 spark 和 pyspark 的新手。
我正在将一个小的 csv 文件(约 40k)读入数据帧。
from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
我遇到了一些奇怪的错误,不是每次都发生,但确实经常发生
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
Traceback (most recent call last):
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
一旦引发了 EOFError,我将不会再看到它,直到我做一些需要与 spark 服务器交互的事情
当我调用 df2.count() 时,它会显示 [Stage xxx] 提示符,这就是我所说的进入 spark 服务器的意思。当我用 df2 做某事时,任何触发似乎最终都会再次给出 EOFError 的东西。
df (vs. df2) 似乎没有发生这种情况,所以似乎它一定是 df.map() 行发生的事情。
【问题讨论】:
我从 spark-users 列表中听说这条消息有点过于冗长,可以忽略。 Pete,你能指点我们的档案吗? 我搜索了 spark-user 列表,但找不到任何关于 EOFError 的信息 :( 我认为问题在于数据帧类型,rdd.collect() 或 df.toJSON().collect() 不会抛出错误,我忽略了这个 ***.com/questions/36561804/… 【参考方案1】:你可以在将数据帧转换为rdd之后尝试做地图吗?您正在对数据框应用地图函数,然后再次从中创建数据框。语法类似于
df.rdd.map().toDF()
请让我知道它是否有效。谢谢。
【讨论】:
【参考方案2】:我相信您正在运行 Spark 2.x 及更高版本。下面的代码应该从 csv 创建您的数据框:
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
然后你可以有以下代码:
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
然后你可以在没有 Row 和 toDF() 的情况下创建 df2
让我知道这是否可行,或者您是否正在使用 Spark 1.6...谢谢。
【讨论】:
以上是关于调用地图后的pyspark EOFError的主要内容,如果未能解决你的问题,请参考以下文章
将 udf 调用移动到新函数后的 azure pyspark udf 属性 nonetype