预排序输入上的 Spark 特征向量变换

Posted

技术标签:

【中文标题】预排序输入上的 Spark 特征向量变换【英文标题】:Spark Feature Vector Transformation on Pre-Sorted Input 【发布时间】:2016-05-03 10:22:52 【问题描述】:

我在 HDFS 上的制表符分隔文件中有一些数据,如下所示:

label | user_id | feature
------------------------------
  pos | 111     | www.abc.com
  pos | 111     | www.xyz.com
  pos | 111     | Firefox
  pos | 222     | www.example.com
  pos | 222     | www.xyz.com
  pos | 222     | IE
  neg | 333     | www.jkl.com
  neg | 333     | www.xyz.com
  neg | 333     | Chrome

我需要对其进行转换,为每个 user_id 创建一个特征向量来训练一个org.apache.spark.ml.classification.NaiveBayes 模型。

我目前的方法基本上如下:

    将原始数据加载到 DataFrame 中 使用 StringIndexer 索引特征 进入 RDD 并按 user_id 分组,并将特征索引映射到稀疏向量中。

关键是……数据已经按 user_id 进行了预排序。利用它的最佳方法是什么?想到可能发生了多少不必要的工作,我感到很痛苦。

如果一些代码有助于理解我目前的方法,这里是地图的精髓:

val featurization = (vals: (String,Iterable[Row])) => 
  // create a Seq of all the feature indices
  // Note: the indexing was done in a previous step not shown
  val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq

  // create the sparse vector
  val featureVector = Vectors.sparse(maxIndex, seq)

  // convert the string label into a Double
  val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0

  (label, vals._1, featureVector)


d.rdd
  .groupBy(_.getString(1))
  .map(featurization)
  .toDF("label","user_id","features")

【问题讨论】:

【参考方案1】:

让我们从your other question开始

如果我的磁盘上的数据保证按将用于组聚合或归约的键进行预排序,Spark 有什么办法可以利用这一点?

这取决于。如果您应用的操作可以从地图端聚合中受益,那么您可以通过预先排序的数据获得很多收益,而无需进一步干预您的代码。共享同一个key的数据应该位于同一个partition上,并且可以在shuffle之前在本地聚合。

不幸的是,在这种特殊情况下它不会有太大帮助。即使您启用地图端聚合(groupBy(Key) 不使用 is 所以您需要自定义实现)或聚合特征向量(您会在我对 How to define a custom aggregation function to sum a column of Vectors? 的回答中找到一些示例),也没有太多收获.您可以在这里和那里节省一些工作,但您仍然需要在节点之间传输所有索引。

如果你想获得更多,你必须做更多的工作。我可以看到您可以利用现有订单的两种基本方式:

    使用自定义 Hadoop 输入格式仅生成完整记录(标签、ID、所有特征),而不是逐行读取数据。如果您的数据每个 id 的行数固定,您甚至可以尝试使用 NLineInputFormat 并在之后应用 mapPartitions 来聚合记录。

    这绝对是更冗长的解决方案,但不需要在 Spark 中进行额外的改组。

    照常读取数据,但对groupBy 使用自定义分区器。据我所知,使用rangePartitioner 应该可以正常工作,但请确保您可以尝试以下步骤:

    使用mapPartitionsWithIndex 查找每个分区的最小/最大ID。 创建分区器,在当前 (i-th) 分区上保持最小值 i + 1 将此分区程序用于groupBy(Key)

    这可能是更友好的解决方案,但至少需要一些改组。如果预期要移动的记录数很少(mapPartitions 和broadcast* 来处理这个问题,尽管分区在实践中可能更有用且更便宜。


* 你可以使用类似这样的方法:https://***.com/a/33072089/1560062

【讨论】:

非常有见地。感谢您的帮助。

以上是关于预排序输入上的 Spark 特征向量变换的主要内容,如果未能解决你的问题,请参考以下文章

什么是特征值和特征向量

特征向量的意义

将特征的 Spark 向量转换为数组

MLlib特征变换方法

特征值和特征向量

Matlab 矩阵特征值排序问题