预排序输入上的 Spark 特征向量变换
Posted
技术标签:
【中文标题】预排序输入上的 Spark 特征向量变换【英文标题】:Spark Feature Vector Transformation on Pre-Sorted Input 【发布时间】:2016-05-03 10:22:52 【问题描述】:我在 HDFS 上的制表符分隔文件中有一些数据,如下所示:
label | user_id | feature
------------------------------
pos | 111 | www.abc.com
pos | 111 | www.xyz.com
pos | 111 | Firefox
pos | 222 | www.example.com
pos | 222 | www.xyz.com
pos | 222 | IE
neg | 333 | www.jkl.com
neg | 333 | www.xyz.com
neg | 333 | Chrome
我需要对其进行转换,为每个 user_id 创建一个特征向量来训练一个org.apache.spark.ml.classification.NaiveBayes
模型。
我目前的方法基本上如下:
-
将原始数据加载到 DataFrame 中
使用 StringIndexer 索引特征
进入 RDD 并按 user_id 分组,并将特征索引映射到稀疏向量中。
关键是……数据已经按 user_id 进行了预排序。利用它的最佳方法是什么?想到可能发生了多少不必要的工作,我感到很痛苦。
如果一些代码有助于理解我目前的方法,这里是地图的精髓:
val featurization = (vals: (String,Iterable[Row])) =>
// create a Seq of all the feature indices
// Note: the indexing was done in a previous step not shown
val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq
// create the sparse vector
val featureVector = Vectors.sparse(maxIndex, seq)
// convert the string label into a Double
val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0
(label, vals._1, featureVector)
d.rdd
.groupBy(_.getString(1))
.map(featurization)
.toDF("label","user_id","features")
【问题讨论】:
【参考方案1】:让我们从your other question开始
如果我的磁盘上的数据保证按将用于组聚合或归约的键进行预排序,Spark 有什么办法可以利用这一点?
这取决于。如果您应用的操作可以从地图端聚合中受益,那么您可以通过预先排序的数据获得很多收益,而无需进一步干预您的代码。共享同一个key的数据应该位于同一个partition上,并且可以在shuffle之前在本地聚合。
不幸的是,在这种特殊情况下它不会有太大帮助。即使您启用地图端聚合(groupBy(Key)
不使用 is 所以您需要自定义实现)或聚合特征向量(您会在我对 How to define a custom aggregation function to sum a column of Vectors? 的回答中找到一些示例),也没有太多收获.您可以在这里和那里节省一些工作,但您仍然需要在节点之间传输所有索引。
如果你想获得更多,你必须做更多的工作。我可以看到您可以利用现有订单的两种基本方式:
使用自定义 Hadoop 输入格式仅生成完整记录(标签、ID、所有特征),而不是逐行读取数据。如果您的数据每个 id 的行数固定,您甚至可以尝试使用 NLineInputFormat
并在之后应用 mapPartitions
来聚合记录。
这绝对是更冗长的解决方案,但不需要在 Spark 中进行额外的改组。
照常读取数据,但对groupBy
使用自定义分区器。据我所知,使用rangePartitioner
应该可以正常工作,但请确保您可以尝试以下步骤:
mapPartitionsWithIndex
查找每个分区的最小/最大ID。
创建分区器,在当前 (i-th) 分区上保持最小值 i + 1
将此分区程序用于groupBy(Key)
这可能是更友好的解决方案,但至少需要一些改组。如果预期要移动的记录数很少(mapPartitions 和broadcast
* 来处理这个问题,尽管分区在实践中可能更有用且更便宜。
* 你可以使用类似这样的方法:https://***.com/a/33072089/1560062
【讨论】:
非常有见地。感谢您的帮助。以上是关于预排序输入上的 Spark 特征向量变换的主要内容,如果未能解决你的问题,请参考以下文章