PySpark:如何将行转换为向量?

Posted

技术标签:

【中文标题】PySpark:如何将行转换为向量?【英文标题】:PySpark: How do I convert rows to Vectors? 【发布时间】:2017-10-17 13:24:42 【问题描述】:

我处理一个包含三列 colA、colB 和 colC 的数据框

+---+-----+-----+-----+
|id |colA |colB |colC |
+---+-----+-----+-----+
| 1 |  5  | 8   | 3   |
| 2 |  9  | 7   | 4   |
| 3 |  3  | 0   | 6   |
| 4 |  1  | 6   | 7   |
+---+-----+-----+-----+

我需要合并 colA、colB 和 colC 列以获得如下所示的新数据帧:

+---+--------------+
|id |     colD     |
+---+--------------+
| 1 |  [5, 8, 3]   |
| 2 |  [9, 7, 4]   |
| 3 |  [3, 0, 6]   |
| 4 |  [1, 6, 7]   |
+---+--------------+

即获取第一个DataFrame的pyspark代码:

l=[(1,5,8,3),(2,9,7,4), (3,3,0,6), (4,1,6,7)]
names=["id","colA","colB","colC"]
db=sqlContext.createDataFrame(l,names)
db.show() 

如何将行转换为向量?有人可以帮我吗? 谢谢

【问题讨论】:

【参考方案1】:

这实际上稍微取决于您想要colD 的数据类型。如果你想要一个VectorUDT 列,那么使用VectorAssembler 是正确的转换。如果您只想将字段组合成一个数组,则不需要 UDF。您可以使用内置的array 函数来组合列:

>>> from pyspark.sql.functions import array
>>> db.select('id',array('colA','colB','colC').alias('colD')).show()

+---+---------+
| id|     colD|
+---+---------+
|  1|[5, 8, 3]|
|  2|[9, 7, 4]|
|  3|[3, 0, 6]|
|  4|[1, 6, 7]|
+---+---------+

这实际上会比其他转换提高性能,因为 pyspark 不必序列化您的 udf。

【讨论】:

嗨,我也在尝试这样做,但问题是我有 262143 列。我想将我必须的 3 行转换为向量。我尝试了这个 cols = [c for c in centres_df.columns] 然后 test = center_df.select(array(cols).alias('colD')) 但这需要很长时间。我在数据块上运行它。 我不确定您要做什么,但我建议您转置您的数据,以便您拥有 262,143 行和 3 列。当您拥有相对较少的列和一堆行时,Spark 通常会更好地工作。我建议您发布一个问题,以便您可以更详细地描述您的问题。 谢谢你的建议,我会试试看的:)【参考方案2】:

您可以使用 pyspark.ml 中的矢量汇编器,

from pyspark.ml.feature import VectorAssembler
newdb = VectorAssembler(inputCols=["colA", "colB", "colC"], outputCol="colD").transform(db)
newdb.show()
+---+----+----+----+-------------+
| id|colA|colB|colC|         colD|
+---+----+----+----+-------------+
|  1|   5|   8|   3|[5.0,8.0,3.0]|
|  2|   9|   7|   4|[9.0,7.0,4.0]|
|  3|   3|   0|   6|[3.0,0.0,6.0]|
|  4|   1|   6|   7|[1.0,6.0,7.0]|
+---+----+----+----+-------------+

或者,如果你愿意,可以使用 udf 进行逐行组合,

from pyspark.sql import functions as F
from pyspark.sql.types import *
udf1 = F.udf(lambda x,y,z : [x,y,z],ArrayType(IntegerType()))
df.select("id",udf1("colA","colB","colC").alias("colD")).show()
+---+---------+
| id|     colD|
+---+---------+
|  1|[5, 8, 3]|
|  2|[9, 7, 4]|
|  3|[3, 0, 6]|
|  4|[1, 6, 7]|
+---+---------+

希望这会有所帮助。!

【讨论】:

以上是关于PySpark:如何将行转换为向量?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 将行转换为列

Pyspark 将行数据转换为键值对

Pyspark(Dataframes)逐行读取文件(将行转换为字符串)

如何将行传递到pyspark udf

python - 如何将密集向量的RDD转换为pyspark中的DataFrame?

将行列表保存到 pyspark 中的 Hive 表