将 Pandas 转换为 Spark 时出现 TypeError
Posted
技术标签:
【中文标题】将 Pandas 转换为 Spark 时出现 TypeError【英文标题】:TypeError when converting Pandas to Spark 【发布时间】:2016-10-04 21:40:38 【问题描述】:所以我在这里查找了这个问题,但以前的解决方案对我不起作用。我有一个这种格式的DataFrame
mdf.head()
dbn boro bus
0 17K548 ***lyn B41, B43, B44-SBS, B45, B48, B49, B69
1 09X543 Bronx Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,...
4 28Q680 Queens Q25, Q46, Q65
6 14K474 ***lyn B24, B43, B48, B60, Q54, Q59
还有几列,但我已将它们排除在外(地铁线路和考试成绩)。当我尝试将此 DataFrame 转换为 Spark DataFrame 时,我收到一个错误,就是这个。
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-30-1721be5c2987> in <module>()
----> 1 sparkdf = sqlc.createDataFrame(mdf)
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
423 rdd, schema = self._createFromRDD(data, schema, samplingRatio)
424 else:
--> 425 rdd, schema = self._createFromLocal(data, schema)
426 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
427 jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema)
339
340 if schema is None or isinstance(schema, (list, tuple)):
--> 341 struct = self._inferSchemaFromList(data)
342 if isinstance(schema, (list, tuple)):
343 for i, name in enumerate(schema):
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data)
239 warnings.warn("inferring schema from dict is deprecated,"
240 "please use pyspark.sql.Row instead")
--> 241 schema = reduce(_merge_type, map(_infer_schema, data))
242 if _has_nulltype(schema):
243 raise ValueError("Some of types cannot be determined after inferring")
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
860 nfs = dict((f.name, f.dataType) for f in b.fields)
861 fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
--> 862 for f in a.fields]
863 names = set([f.name for f in fields])
864 for n in nfs:
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
854 elif type(a) is not type(b):
855 # TODO: type cast (such as int -> long)
--> 856 raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
857
858 # same type
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
根据我的阅读,这可能是标头被视为数据的问题。据我了解,您无法从 DataFrame 中删除标头,那么我将如何解决此错误并将此 DataFrame 转换为 Spark 呢?
编辑:这是我如何创建 Pandas DF 并解决问题的代码。
sqlc = SQLContext(sc)
df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig'))
df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']]
df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig'))
df1 = df1.rename(columns = 'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg')
mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left')
mdf = mdf[pd.notnull(mdf['DBN'])]
mdf.to_csv('merged.csv', encoding = 'utf-8')
ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv")
这段代码的最后一行,从我的本地机器加载它最终允许我将 CSV 正确转换为数据框,但我的问题仍然存在。为什么它一开始就不起作用?
【问题讨论】:
【参考方案1】:您可以使用反射从 Row
对象的 RDD 中推断架构,例如,
from pyspark.sql import Row
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2]))
dfOut = sqlContext.createDataFrame(mdfRows)
这是否达到了预期的效果?
【讨论】:
我收到一个错误AttributeError: 'DataFrame' object has no attribute 'map'
哦。 mdf
是熊猫数据框吗?我错误地认为它是 Spark RDD。你需要使用熊猫吗?或者您可以创建一个 Spark RDD,然后将其转换为如上所述的 Spark DataFrame?
所以这是我面临的问题。如果我使用com.databricks.spark.csv
将其加载为RDD 以将其读取为CSV,它会完全忽略dbn 列并将所有内容向左移动一列。我不确定如何避免这个问题,所以我通过 Pandas read_csv
加载它,它保留了原始 CSV 的格式。
你的意思是你试过spark.read.csv("/path/to/file.csv", header=True)
,但没用?
老实说,我不太确定问题出在哪里...我已经根据您提供的示例数据制作了一个 pandas DataFrame,并毫无问题地执行了sparkDF = spark.createDataFrame(df)
。我还从示例数据制作了一个 CSV 文件并运行sparkDF = spark.read.csv("sample.csv", header=True)
,也没有问题。也许你可以在你的问题中加入一些关于你是如何创建 pandas DataFrame 的?【参考方案2】:
我遇到了同样的问题,并且能够将其追踪到长度为 0(或为空)的单个条目。 _inferScheme
命令在数据帧的每一行上运行并确定类型。默认情况下,空值是 Double 而另一个是 String。这两种类型不能通过_merge_type
命令合并。该问题已提交https://issues.apache.org/jira/browse/SPARK-18178,但最好的解决方法可能是为createDataFrame
命令提供架构。
下面的代码重现了 PySpark 2.0 中的问题
import pandas as pd
from io import StringIO
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n'))
sqlContext.createDataFrame(test_df).registerTempTable('Test')
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1")
o_qry.first()
【讨论】:
【参考方案3】:你也可以试试这个:
def create_spark_dataframe(file_name):
"""
will return the spark dataframe input pandas dataframe
"""
pandas_data_frame = pd.read_csv(file_name, converters= "PRODUCT": str)
for col in pandas_data_frame.columns:
if ((pandas_data_frame[col].dtypes != np.int64) &
(pandas_data_frame[col].dtypes != np.float64)):
pandas_data_frame[col] = pandas_data_frame[col].fillna('')
spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
return spark_data_frame
这将解决您的问题。
【讨论】:
【参考方案4】:这里的问题是 pandas 默认的 np.nan
(Not a number) 空字符串值,这在转换为 spark.df 时会在 Schema 中造成混乱。
基本方法是将 np.nan 转换为 None,这将使其能够工作
不幸的是,pandas 不允许你用 None 填充。因为,np.nan 不遵循自平等条件,所以你可以做这个漂亮的把戏。
new_series = new_series.apply(lambda x: None if x != x else x)
那么,display(sqlContext.createDataFrame(new_df_1))
就可以正常工作了
【讨论】:
如果有人能建议我将 nan 转换为 None 的直接方法,我会很高兴 我用 0 填充了 NaN 并没有解决错误。 @LePuppy,你的列的数据类型是什么,另外,检查我更新的解决方案,它独立于列的数据类型,应该可以工作 我有字符串和双精度类型。我发现将所有转换为字符串使我能够创建火花数据框。然后,我仍然可以使用 cast 来转换列类型。 @LePuppy 工作量太大了,这个不是更系统地处理所有这些以上是关于将 Pandas 转换为 Spark 时出现 TypeError的主要内容,如果未能解决你的问题,请参考以下文章
尝试将 Dictionary 转换为 DataFrame Pandas 时出现 ValueError
将 RDD 转换为 DataFrame 时出现 java.lang.***Error
将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException
将 pyspark pandas_udf 与 AWS EMR 一起使用时出现“没有名为‘pandas’的模块”错误