PySpark:将 RDD 转换为数据框中的列
Posted
技术标签:
【中文标题】PySpark:将 RDD 转换为数据框中的列【英文标题】:PySpark: Convert RDD to column in dataframe 【发布时间】:2018-05-15 09:43:23 【问题描述】:我有一个 spark 数据框,我使用它来计算一行和一组给定坐标之间的欧几里得距离。我在这里重新创建一个结构相似的数据框“df_vector”以便更好地解释。
from pyspark.ml.feature import VectorAssembler
arr = [[1,2,3], [4,5,6]]
df_example = spark.createDataFrame(arr, ['A','B','C'])
assembler = VectorAssembler(inputCols=[x for x in df_example.columns],outputCol='features')
df_vector = assembler.transform(df_example).select('features')
>>> df_vector.show()
+-------------+
| features|
+-------------+
|[1.0,2.0,3.0]|
|[4.0,5.0,6.0]|
+-------------+
>>> df_vector.dtypes
[('features', 'vector')]
如您所见,features
列是一个向量。在实践中,我将此向量列作为StandardScaler
的输出。无论如何,由于我需要计算欧几里得距离,所以我执行以下操作
rdd = df_vector.select('features').rdd.map(lambda r: np.linalg.norm(r-b))
在哪里
b = np.asarray([0.5,1.0,1.5])
我有我需要的所有计算,但我需要这个rdd
作为df_vector
中的一列。我该怎么办?
【问题讨论】:
Add PySpark RDD as new column to pyspark.sql.dataframe的可能重复 【参考方案1】:您可以使用UDF
,而不是创建新的rdd:
norm_udf = udf(lambda r: np.linalg.norm(r - b).tolist(), FloatType())
df_vector.withColumn("norm", norm_udf(df.features))
确保在工作节点上定义了numpy
。
【讨论】:
这行得通。谢谢。但是有没有一种方法可以在不使用 udf 的情况下做到这一点?我们必须处理更大的数据集(大约 1000 万条记录),而 udf 往往会降低性能。 @ClockSlave 我现在想不出其他好方法。我认为它的性能不应该比转换为 rdd,进行转换然后返回,所以至少是这样。【参考方案2】:解决性能问题的一种方法可能是使用mapPartitions
。这个想法是,在分区级别,将features
转换为数组,然后计算整个数组的范数(因此隐式使用 numpy 向量化)。然后做一些整理工作以获得您想要的表格。对于大型数据集,这可能会提高性能:
这是在分区级别计算范数的函数:
from pyspark.sql import Row
def getnorm(vectors):
# convert vectors into numpy array
vec_array=np.vstack([v['features'] for v in vectors])
# calculate the norm
norm=np.linalg.norm(vec_array-b, axis=1)
# tidy up to get norm as a column
output=[Row(features=x, norm=y) for x,y in zip(vec_array.tolist(), norm.tolist())]
return(output)
使用mapPartitions
应用它会得到一个 RDD 行,然后可以将其转换为 DataFrame:
df_vector.rdd.mapPartitions(getnorm).toDF()
【讨论】:
以上是关于PySpark:将 RDD 转换为数据框中的列的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]