PySpark:将 RDD 转换为数据框中的列

Posted

技术标签:

【中文标题】PySpark:将 RDD 转换为数据框中的列【英文标题】:PySpark: Convert RDD to column in dataframe 【发布时间】:2018-05-15 09:43:23 【问题描述】:

我有一个 spark 数据框,我使用它来计算一行和一组给定坐标之间的欧几里得距离。我在这里重新创建一个结构相似的数据框“df_vector”以便更好地解释。

from pyspark.ml.feature import VectorAssembler
arr = [[1,2,3], [4,5,6]]
df_example = spark.createDataFrame(arr, ['A','B','C'])
assembler = VectorAssembler(inputCols=[x for x in df_example.columns],outputCol='features')
df_vector = assembler.transform(df_example).select('features') 

>>> df_vector.show()
+-------------+
|     features|
+-------------+
|[1.0,2.0,3.0]|
|[4.0,5.0,6.0]|
+-------------+

>>> df_vector.dtypes
[('features', 'vector')]

如您所见,features 列是一个向量。在实践中,我将此向量列作为StandardScaler 的输出。无论如何,由于我需要计算欧几里得距离,所以我执行以下操作

rdd = df_vector.select('features').rdd.map(lambda r: np.linalg.norm(r-b))

在哪里

b = np.asarray([0.5,1.0,1.5])

我有我需要的所有计算,但我需要这个rdd 作为df_vector 中的一列。我该怎么办?

【问题讨论】:

Add PySpark RDD as new column to pyspark.sql.dataframe的可能重复 【参考方案1】:

您可以使用UDF,而不是创建新的rdd:

norm_udf = udf(lambda r: np.linalg.norm(r - b).tolist(), FloatType())
df_vector.withColumn("norm", norm_udf(df.features))

确保在工作节点上定义了numpy

【讨论】:

这行得通。谢谢。但是有没有一种方法可以在不使用 udf 的情况下做到这一点?我们必须处理更大的数据集(大约 1000 万条记录),而 udf 往往会降低性能。 @ClockSlave 我现在想不出其他好方法。我认为它的性能不应该比转换为 rdd,进行转换然后返回,所以至少是这样。【参考方案2】:

解决性能问题的一种方法可能是使用mapPartitions。这个想法是,在分区级别,将features 转换为数组,然后计算整个数组的范数(因此隐式使用 numpy 向量化)。然后做一些整理工作以获得您想要的表格。对于大型数据集,这可能会提高性能:

这是在分区级别计算范数的函数:

from pyspark.sql import Row
def getnorm(vectors):
    # convert vectors into numpy array
    vec_array=np.vstack([v['features'] for v in vectors])
    # calculate the norm
    norm=np.linalg.norm(vec_array-b, axis=1)
    # tidy up to get norm as a column
    output=[Row(features=x, norm=y) for x,y in zip(vec_array.tolist(), norm.tolist())]
    return(output)

使用mapPartitions 应用它会得到一个 RDD 行,然后可以将其转换为 DataFrame:

df_vector.rdd.mapPartitions(getnorm).toDF()

【讨论】:

以上是关于PySpark:将 RDD 转换为数据框中的列的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]

PySpark 将“map”类型的列转换为数据框中的多列

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

如何更改pyspark中的列元数据?

将列表的列拆分为同一 PySpark 数据框中的多列

如何计算 pyspark RDD 中的列数?