如何将向量拆分为列 - 使用 PySpark

Posted

技术标签:

【中文标题】如何将向量拆分为列 - 使用 PySpark【英文标题】:How to split Vector into columns - using PySpark 【发布时间】:2016-07-14 21:12:03 【问题描述】:

上下文:我有一个DataFrame,有 2 列:单词和向量。其中“向量”的列类型为VectorUDT

一个例子:

word    |  vector
assert  | [435,323,324,212...]

我想得到这个:

word   |  v1 | v2  | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....

问题:

如何使用 PySpark 将包含向量的列拆分为每个维度的多个列?

提前致谢

【问题讨论】:

请参阅How to Access Element of a VectorUDT column in a Spark Dataframe 以获得性能更好的解决方案。 (我对两者都做了计时。如果我有声望,我会将此标记为重复。) @hwrd 你能分享你用过的基准测试代码吗? TIA。 @user10465355 添加为下面的“解决方案”,因为它对于 cmets 来说太大了。 (这个组织有点古怪,因为我把它从 Jupyter 笔记本中拉出来,然后替换了 %%timeit 单元魔法。) 【参考方案1】:

火花 >= 3.0.0

从 Spark 3.0.0 开始,这可以在不使用 UDF 的情况下完成。

from pyspark.ml.functions import vector_to_array

(df
    .withColumn("xs", vector_to_array("vector")))
    .select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## |   word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert|  1.0|  2.0|  3.0|
## |require|  0.0|  2.0|  0.0|
## +-------+-----+-----+-----+

火花

一种可能的方法是与 RDD 相互转换:

from pyspark.ml.linalg import Vectors

df = sc.parallelize([
    ("assert", Vectors.dense([1, 2, 3])),
    ("require", Vectors.sparse(3, 1: 2))
]).toDF(["word", "vector"])

def extract(row):
    return (row.word, ) + tuple(row.vector.toArray().tolist())

df.rdd.map(extract).toDF(["word"])  # Vector values will be named _2, _3, ...

## +-------+---+---+---+
## |   word| _2| _3| _4|
## +-------+---+---+---+
## | assert|1.0|2.0|3.0|
## |require|0.0|2.0|0.0|
## +-------+---+---+---+

另一种解决方案是创建 UDF:

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    # Important: asNondeterministic requires Spark 2.3 or later
    # It can be safely removed i.e.
    # return udf(to_array_, ArrayType(DoubleType()))(col)
    # but at the cost of decreased performance
    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)

(df
    .withColumn("xs", to_array(col("vector")))
    .select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## |   word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert|  1.0|  2.0|  3.0|
## |require|  0.0|  2.0|  0.0|
## +-------+-----+-----+-----+

对于 Scala 等价物,请参阅 Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)]。

【讨论】:

性能方面,使用 .map/.toDF 函数要聪明得多,因为它们几乎总是比 UDF 实现更快。 [除非您使用 spark 2.2+ 中的 vectorized udf 定义] 感谢您的评论。 RDD API 不会在某个时候被弃用吗?所以我认为后者是推荐的,还是我在拧? 请注意,从 spark 2.3 开始,vectorized UDF 要求输入和输出冷却都是 java 原语,因此它不适用于此应用程序。 相关的 JIRA 票证以改进这一点:issues.apache.org/jira/browse/SPARK-19217 @zero323 - 对我来说,UDF 方式似乎奏效了。如何将结果保存为 DF?当我最后做一个 .show 时,它会以我想要的方式向我展示 DF。只是似乎找不到命名/保存它的方法。【参考方案2】:

要将训练 PySpark ML 模型后生成的 rawPredictionprobability 列拆分为 Pandas 列,您可以这样拆分:

your_pandas_df['probability'].apply(lambda x: pd.Series(x.toArray()))

【讨论】:

我收到此错误:TypeError: 'Column' object is not callable【参考方案3】:

使用how-to-access-element-of-a-vectorudt-column-in-a-spark-dataframe中的i_th udf要快得多

上面zero323的解决方案中给出的extract函数使用了toList,它创建了一个Python list对象,用Python float对象填充它,通过遍历list找到想要的元素,然后需要转换回java double;对每一行重复。使用 rdd 比 to_array udf 慢得多,后者也调用 toList,但两者都比让 SparkSQL 处理大部分工作的 udf 慢得多。

将此处提出的 rdd extract 和 to_array udf 与来自3955864 的 i_th udf 进行比较的时序代码:

from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.sql.functions import lit, udf, col
from pyspark.sql.types import ArrayType, DoubleType
import pyspark.sql.dataframe
from pyspark.sql.functions import pandas_udf, PandasUDFType

sc = SparkContext('local[4]', 'FlatTestTime')

spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.enabled", True)

from pyspark.ml.linalg import Vectors

# copy the two rows in the test dataframe a bunch of times,
# make this small enough for testing, or go for "big data" and be prepared to wait
REPS = 20000

df = sc.parallelize([
    ("assert", Vectors.dense([1, 2, 3]), 1, Vectors.dense([4.1, 5.1])),
    ("require", Vectors.sparse(3, 1: 2), 2, Vectors.dense([6.2, 7.2])),
] * REPS).toDF(["word", "vector", "more", "vorpal"])

def extract(row):
    return (row.word, ) + tuple(row.vector.toArray().tolist(),) + (row.more,) + tuple(row.vorpal.toArray().tolist(),)

def test_extract():
    return df.rdd.map(extract).toDF(['word', 'vector__0', 'vector__1', 'vector__2', 'more', 'vorpal__0', 'vorpal__1'])

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)

def test_to_array():
    df_to_array = df.withColumn("xs", to_array(col("vector"))) \
        .select(["word"] + [col("xs")[i] for i in range(3)] + ["more", "vorpal"]) \
        .withColumn("xx", to_array(col("vorpal"))) \
        .select(["word"] + ["xs[]".format(i) for i in range(3)] + ["more"] + [col("xx")[i] for i in range(2)])
    return df_to_array

# pack up to_array into a tidy function
def flatten(df, vector, vlen):
    fieldNames = df.schema.fieldNames()
    if vector in fieldNames:
        names = []
        for fieldname in fieldNames:
            if fieldname == vector:
                names.extend([col(vector)[i] for i in range(vlen)])
            else:
                names.append(col(fieldname))
        return df.withColumn(vector, to_array(col(vector)))\
                 .select(names)
    else:
        return df

def test_flatten():
    dflat = flatten(df, "vector", 3)
    dflat2 = flatten(dflat, "vorpal", 2)
    return dflat2

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

select = ["word"]
select.extend([ith("vector", lit(i)) for i in range(3)])
select.append("more")
select.extend([ith("vorpal", lit(i)) for i in range(2)])

# %% timeit ...
def test_ith():
    return df.select(select)

if __name__ == '__main__':
    import timeit

    # make sure these work as intended
    test_ith().show(4)
    test_flatten().show(4)
    test_to_array().show(4)
    test_extract().show(4)

    print("i_th\t\t",
          timeit.timeit("test_ith()",
                       setup="from __main__ import test_ith",
                       number=7)
         )
    print("flatten\t\t",
          timeit.timeit("test_flatten()",
                       setup="from __main__ import test_flatten",
                       number=7)
         )
    print("to_array\t",
          timeit.timeit("test_to_array()",
                       setup="from __main__ import test_to_array",
                       number=7)
         )
    print("extract\t\t",
          timeit.timeit("test_extract()",
                       setup="from __main__ import test_extract",
                       number=7)
         )

结果:

i_th         0.05964796099999958
flatten      0.4842299350000001
to_array     0.42978780299999997
extract      2.9254476840000017

【讨论】:

那是一些看起来很棒的代码,但它有什么作用呢?你能对使用的不同方法添加一些解释吗? @***.com/users/2254228/chuck 代码将 Spark ***.com/questions/39555864/… 中提出的 ith 函数相乘,以表明后一种解决方案更优越。 您是在比较 DAG 结构,而不是实际的转换。【参考方案4】:
def splitVecotr(df, new_features=['f1','f2']):
schema = df.schema
cols = df.columns

for col in new_features: # new_features should be the same length as vector column length
    schema = schema.add(col,DoubleType(),True)

return spark.createDataFrame(df.rdd.map(lambda row: [row[i] for i in cols]+row.features.tolist()), schema)

函数将特征向量列变成单独的列

【讨论】:

以上是关于如何将向量拆分为列 - 使用 PySpark的主要内容,如果未能解决你的问题,请参考以下文章

如何将向量重新排列为列而不是行?

如何将逗号分隔的值拆分为列

如何在 yii 视图中将内容拆分为列-单独的 div

如何在初始化期间将 std::initializer_list 转换为列矩阵或列向量?

使用 Oracle SQL 将可变长度分隔字符串拆分为列

在 SQL 中将分隔的行拆分为列