在 PySpark 的 DataFrame 列中存储 DenseVector
Posted
技术标签:
【中文标题】在 PySpark 的 DataFrame 列中存储 DenseVector【英文标题】:Store DenseVector in DataFrame column in PySpark 【发布时间】:2019-08-16 09:39:40 【问题描述】:我正在尝试将 DenseVector 存储到新列中的 DataFrame 中。
我尝试了以下代码,但得到了一个 AttributeError
说 'numpy.ndarray' object has no attribute '_get_object_id'
。
from pyspark.sql import functions
from pyspark.mllib.linalg import Vectors
df = spark.createDataFrame(['name': 'Alice', 'age': 1,
'name': 'Bob', 'age': 2])
vec = Vectors.dense([1.0, 3.0, 2.9])
df.withColumn('vector', functions.lit(vec))
我希望每行存储一个向量以用于计算目的。任何帮助表示赞赏。
[Python 3.7.3,Spark 版本 2.4.3,通过 Jupyter All-Spark-Notebook]
编辑
我尝试按照 Florian 的建议遵循here 的答案,但我无法调整 udf 以接受自定义的预构建向量。
conv = functions.udf(lambda x: DenseVector(x), VectorUDT())
# Same with
# conv = functions.udf(lambda x: x, VectorUDT())
df.withColumn('vector', conv(vec)).show()
我收到此错误:
TypeError: Invalid argument, not a string or column: [1.0,3.0,2.9] of type <class 'pyspark.mllib.linalg.DenseVector'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
【问题讨论】:
Adding a Vectors Column to a pyspark DataFrame 的可能重复项。那里的答案对你有帮助吗? 谢谢@Florian。知道如何修改 udf 以便我可以传入自己的向量吗?我按照那里的答案尝试了udf(lambda x: x, VectorUDT())
,但是没有用。
这似乎工作 - df.withColumn('vector', functions.array([functions.lit(k) for k in vec]))
【参考方案1】:
您可以将 udf
的创建包装在一个函数中,因此它会返回带有您的向量的 udf
。下面给出一个例子,希望对你有帮助!
import pyspark.sql.functions as F
from pyspark.ml.linalg import VectorUDT, DenseVector
df = spark.createDataFrame(['name': 'Alice', 'age': 1,
'name': 'Bob', 'age': 2])
def vector_column(x):
return F.udf(lambda: x, VectorUDT())()
vec = DenseVector([1.0, 3.0, 2.9])
df.withColumn("vector", vector_column(vec)).show()
输出:
+---+-----+-------------+
|age| name| vector|
+---+-----+-------------+
| 1|Alice|[1.0,3.0,2.9]|
| 2| Bob|[1.0,3.0,2.9]|
+---+-----+-------------+
【讨论】:
谢谢@Florian。可以确认这行得通。您能解释一下为什么有时需要在函数中包装 udf 吗?或者这是使用 udf 的推荐方式? 严格来说,您不需要在这里将 udf 包装在函数中,df.withColumn("vector", F.udf(lambda: DenseVector([1.0, 3.0, 2.9]), VectorUDT())())
也可以。然而,为了可重用性,我发现将此语句放在为我们返回 UDF 的函数中更简洁。以上是关于在 PySpark 的 DataFrame 列中存储 DenseVector的主要内容,如果未能解决你的问题,请参考以下文章
如何将日期转换为 PySpark Dataframe 列中的第一天?
正则表达式在 PySpark Dataframe 列中查找所有不包含 _(Underscore) 和 :(Colon) 的字符串