来自现有 RDD 的数据框 - Python Spark
Posted
技术标签:
【中文标题】来自现有 RDD 的数据框 - Python Spark【英文标题】:Data Frame from an existing RDD - Python Spark 【发布时间】:2017-06-09 18:17:44 【问题描述】:我正在尝试通过指定列标签和数据类型从现有 RDD 创建一个数据框,但我得到了这个类型错误:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
yFieldTypes = [FloatType()]
ySchemaString = "Predictor"
fy_data = [StructField(field_name, field_type, True) \
for field_name, field_type in zip(ySchemaString.split(), yFieldTypes)]
schema_y = StructType(fy_data)
所以架构如下:
StructType(List(StructField(Predictor,FloatType,true)))
而我的 RDD datay.take(10) 输出如下:
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
当我想创建我的数据框时:
dfy = sqlContext.createDataFrame(datay, schema_y)
我得到这个类型错误:
TypeError: StructType(List(StructField(Predictor,FloatType,true))) can not accept object in type <type 'float'>
【问题讨论】:
你试过在你的 rdd 上调用 toDF() 吗? 【参考方案1】:那是因为它不是正确的架构。既然你有原子类型,你应该直接使用FloatType
:
dfy = sqlContext.createDataFrame(datay, FloatType())
要使用当前模式,您应该使用元组:
dfy = sqlContext.createDataFrame(datay.map(lambda x: (x, )), schema_y)
【讨论】:
【参考方案2】:问题是因为RDD
属于Double
类型,而schema
定义为FloatType
。您必须找到一种方法来将 Double rdds 转换为 Float。
我没有与pyspark
合作过,但我将在scala
代码下方发布以帮助您。
val datay = sc.parallelize(Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
val r = datay.map(d => Row.fromSeq(Seq(d.toFloat)))
val schema_y = StructType(List(StructField("Predictor", FloatType, true)))
val dfy = sqlContext.createDataFrame(r, schema_y)
我希望这将帮助您找到所需的解决方案
【讨论】:
以上是关于来自现有 RDD 的数据框 - Python Spark的主要内容,如果未能解决你的问题,请参考以下文章
映射 dict(来自 rdd)以递归方式更改 Python/PySpark 中的列名
pyspark中的RDD到DataFrame(来自rdd的第一个元素的列)