将 numpy 数组的 rdd 转换为 pyspark 数据帧
Posted
技术标签:
【中文标题】将 numpy 数组的 rdd 转换为 pyspark 数据帧【英文标题】:Converting rdd of numpy arrays to pyspark dataframe 【发布时间】:2018-01-29 10:31:46 【问题描述】:尝试将由 numpy 数组组成的 rdd 转换为 pyspark 中的数据帧时出现以下错误:
下面是导致这个错误的一段代码,我什至不确定我是否能得到错误的实际位置,即使阅读跟踪......
有谁知道如何绕过?
非常感谢!
In [111]: rddUser.take(5)
Out[111]:
[array([u'1008798262000292538', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'102254941859441333', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'1035609083097069747', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'10363297284472000', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'1059178934871294116', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32')]
那么麻烦就来了:
In [110]: rddUser.toDF(schema=None).show()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-110-073037afd70e> in <module>()
----> 1 rddUser.toDF(schema=None).show()
62 [Row(name=u'Alice', age=1)]
63 """
---> 64 return sqlContext.createDataFrame(self, schema, sampleRatio)
65
66 RDD.toDF = toDF
421
422 if isinstance(data, RDD):
--> 423 rdd, schema = self._createFromRDD(data, schema, samplingRatio)
424 else:
425 rdd, schema = self._createFromLocal(data, schema)
308 """
309 if schema is None or isinstance(schema, (list, tuple)):
--> 310 struct = self._inferSchema(rdd, samplingRatio)
311 converter = _create_converter(struct)
312 rdd = rdd.map(converter)
253 """
254 first = rdd.first()
--> 255 if not first:
256 raise ValueError("The first row in RDD is empty, "
257 "can not infer schema")
ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
【问题讨论】:
嗨,我想获得一个 pyspark 数据框,在我的初始数组中每个元素都有一个字段。第一个字段是 user_id,然后是 first_feature,second_feature... 直到最后一个。为了进一步了解这个概念,我发现这是一种(可能很丑陋)在我拥有的数据集上手动计算一个热编码的方法。我想对生成的 pyspark 数据框计算逻辑回归... 【参考方案1】:如果 RDD 仅定义为 map
和 tolist
import numpy as np
rdd = spark.sparkContext.parallelize([
np.array([u'1059178934871294116', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0']),
np.array([u'102254941859441333', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0'])
])
df = rdd.map(lambda x: x.tolist()).toDF(["user_id"])
# +-------------------+---+---+---+---+---+
# | user_id| _2| _3| _4| _5| _6|
# +-------------------+---+---+---+---+---+
# |1059178934871294116|1.0|0.0|0.0|0.0|1.0|
# | 102254941859441333|1.0|0.0|0.0|0.0|1.0|
# +-------------------+---+---+---+---+---+
但考虑到您的评论,我假设您想将它与 ml
一起使用。那么这可能会更好:
from pyspark.ml.linalg import DenseVector
(rdd
.map(lambda x: (x[0].tolist(), DenseVector(x[1:])))
.toDF(["user_id", "features"])
.show(2, False))
# +-------------------+---------------------+
# |user_id |features |
# +-------------------+---------------------+
# |1059178934871294116|[1.0,0.0,0.0,0.0,1.0]|
# |102254941859441333 |[1.0,0.0,0.0,0.0,1.0]|
# +-------------------+---------------------+
你也应该看看pyspark.ml.feature.OneHotEncoder
。
【讨论】:
以上是关于将 numpy 数组的 rdd 转换为 pyspark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章