PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?

Posted

技术标签:

【中文标题】PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?【英文标题】:PySpark: How to write a Spark dataframe having a column with type SparseVector into CSV file? 【发布时间】:2016-10-12 19:35:18 【问题描述】:

我有一个 spark 数据框,其中有一列类型为 spark.mllib.linalg.SparseVector:

1) 如何将其写入 csv 文件?

2) 如何打印所有向量?

【问题讨论】:

【参考方案1】:

要将数据帧写入 csv 文件,您可以使用标准 df.write.csv(output_path)

但是,如果您只使用上述方法,您可能会在 SparseVector 类型的列中收到 java.lang.UnsupportedOperationException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type 错误。

有两种方法可以打印 SparseVector 并避免该错误:稀疏格式或密集格式。

如果要打印稀疏格式,可以这样定义udf:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

sparse_format_udf = udf(lambda x: ','.join([str(elem) for elem in x], StringType())

df = df.withColumn('column_name', sparse_format_udf(col('column_name')))

df.write.option("delimiter", "\t").csv(output_path)

该列以密集格式输出如下:1.0,0.0,5.0,0.0

如果您想以密集格式打印,您可以使用the OOB __str__ function of the SparseVector class,或者有创意并定义自己的输出格式。这里我要使用OOB功能。

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

sparse_format_udf = udf(lambda x: str(x), StringType())

df = df.withColumn('column_name', sparse_format_udf(col('column_name')))

df.write.option("delimiter", "\t").csv(output_path)

该列以稀疏格式(4,[0,2],[1.0,5.0]) 打印成类似这样的内容

注意,我之前尝试过这种方法:df = df.withColumn("column_name", col("column_name").cast("string")),但该列只打印到类似 [0,5,org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@6988050,org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@ec4ae6ab] 的内容,这是不可取的。

【讨论】:

谢谢,这很有用,但你没有在 udf 名称中混淆稀疏和密集吗? IE。 1.0,0.0,5.0,0.0 应该是dense_format_udf 密集的,而(4,[0,2],[1.0,5.0]) 应该是稀疏格式和sparse_format_udf?【参考方案2】:
    https://github.com/databricks/spark-csv

    df2 = df1.map(lambda row: row.yourVectorCol)

    df1.map(lambda row: row[1])

    你要么有一个命名的列,要么只是通过它在行中的位置来引用该列。

    然后,要打印它,你可以df2.collect()

如果没有更多信息,这可能对您有帮助,或者对您没有足够的帮助。请详细说明一下。

【讨论】:

以上是关于PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?

如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?

Pyspark:将df写入具有特定名称的文件,绘制df

如何在pyspark中连接具有相同名称的列的值

如何使用具有不同列号pyspark的两个数据帧的并集