在 PySpark 的 DataFrame 列中存储 DenseVector

Posted

技术标签:

【中文标题】在 PySpark 的 DataFrame 列中存储 DenseVector【英文标题】:Store DenseVector in DataFrame column in PySpark 【发布时间】:2019-08-16 09:39:40 【问题描述】:

我正在尝试将 DenseVector 存储到新列中的 DataFrame 中。

我尝试了以下代码,但得到了一个 AttributeError'numpy.ndarray' object has no attribute '_get_object_id'

from pyspark.sql import functions
from pyspark.mllib.linalg import Vectors

df = spark.createDataFrame(['name': 'Alice', 'age': 1,
                            'name': 'Bob', 'age': 2])

vec = Vectors.dense([1.0, 3.0, 2.9])

df.withColumn('vector', functions.lit(vec))

我希望每行存储一个向量以用于计算目的。任何帮助表示赞赏。

[Python 3.7.3,Spark 版本 2.4.3,通过 Jupyter All-Spark-Notebook]

编辑

我尝试按照 Florian 的建议遵循here 的答案,但我无法调整 udf 以接受自定义的预构建向量。

conv = functions.udf(lambda x: DenseVector(x), VectorUDT())
# Same with
# conv = functions.udf(lambda x: x, VectorUDT())

df.withColumn('vector', conv(vec)).show()

我收到此错误:

TypeError: Invalid argument, not a string or column: [1.0,3.0,2.9] of type <class 'pyspark.mllib.linalg.DenseVector'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

【问题讨论】:

Adding a Vectors Column to a pyspark DataFrame 的可能重复项。那里的答案对你有帮助吗? 谢谢@Florian。知道如何修改 udf 以便我可以传入自己的向量吗?我按照那里的答案尝试了udf(lambda x: x, VectorUDT()),但是没有用。 这似乎工作 - df.withColumn('vector', functions.array([functions.lit(k) for k in vec])) 【参考方案1】:

您可以将 udf 的创建包装在一个函数中,因此它会返回带有您的向量的 udf。下面给出一个例子,希望对你有帮助!

import pyspark.sql.functions as F
from pyspark.ml.linalg import VectorUDT, DenseVector

df = spark.createDataFrame(['name': 'Alice', 'age': 1,
                            'name': 'Bob', 'age': 2])

def vector_column(x): 
    return F.udf(lambda: x, VectorUDT())()

vec = DenseVector([1.0, 3.0, 2.9])
df.withColumn("vector", vector_column(vec)).show()

输出:

+---+-----+-------------+
|age| name|       vector|
+---+-----+-------------+
|  1|Alice|[1.0,3.0,2.9]|
|  2|  Bob|[1.0,3.0,2.9]|
+---+-----+-------------+

【讨论】:

谢谢@Florian。可以确认这行得通。您能解释一下为什么有时需要在函数中包装 udf 吗?或者这是使用 udf 的推荐方式? 严格来说,您不需要在这里将 udf 包装在函数中,df.withColumn("vector", F.udf(lambda: DenseVector([1.0, 3.0, 2.9]), VectorUDT())()) 也可以。然而,为了可重用性,我发现将此语句放在为我们返回 UDF 的函数中更简洁。

以上是关于在 PySpark 的 DataFrame 列中存储 DenseVector的主要内容,如果未能解决你的问题,请参考以下文章

如何将日期转换为 PySpark Dataframe 列中的第一天?

正则表达式在 PySpark Dataframe 列中查找所有不包含 _(Underscore) 和 :(Colon) 的字符串

PySpark DataFrame的逐行聚合

从 pyspark 中的数据框数组类型列中获取“名称”元素

在 groupby 操作 PySpark 中聚合列中的稀疏向量

PySpark查找另一列中是否存在一列中的模式