从 org.apache.spark.sql.Dataset 转换为 CoordinateMatrix

Posted

技术标签:

【中文标题】从 org.apache.spark.sql.Dataset 转换为 CoordinateMatrix【英文标题】:Converting from org.apache.spark.sql.Dataset to CoordinateMatrix 【发布时间】:2018-06-20 10:53:40 【问题描述】:

我有一个 Spark SQL 数据集,其架构定义如下,

User_id <String> | Item_id <String> | Bought_Status <Boolean>

我想将其转换为稀疏矩阵以应用推荐系统算法。这是非常庞大的 RDD 数据集,所以我读到 CoordinateMatrix 是从中创建稀疏矩阵的正确方法。

但是我被困在 API 文档说RDD[MatrixEntry] 是创建CoordinateMatrix 的必要条件。同样MatrixEntry 需要intintlong 的格式。

我无法将我的数据方案转换为这种格式。您能帮我了解如何将这些数据转换为 Spark 中的稀疏矩阵吗?我目前正在使用 scala 进行编程

【问题讨论】:

【参考方案1】:

请注意,矩阵实体的类型为 long、long、double

参考:https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.linalg.distributed.MatrixEntry

此外,由于用户/项目列是字符串,因此需要在处理之前对其进行索引。以下是使用 scala 创建坐标矩阵的方法:

//Imports needed
scala> import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix

scala> import org.apache.spark.mllib.linalg.distributed.MatrixEntry
import org.apache.spark.mllib.linalg.distributed.MatrixEntry

scala> import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.StringIndexer

//Let's create a dummy dataframe
scala> val df = spark.sparkContext.parallelize(List(
     | ("u1","i1" ,true),
     | ("u1","i2" ,true),
     | ("u2","i3" ,false),
     | ("u2","i4" ,false),
     | ("u3","i1" ,true),
     | ("u3","i3" ,true),
     | ("u4","i3" ,false),
     | ("u4","i4" ,false))).toDF("user","item","bought")
df: org.apache.spark.sql.DataFrame = [user: string, item: string ... 1 more field]

scala> df.show
+----+----+------+
|user|item|bought|
+----+----+------+
|  u1|  i1|  true|
|  u1|  i2|  true|
|  u2|  i3| false|
|  u2|  i4| false|
|  u3|  i1|  true|
|  u3|  i3|  true|
|  u4|  i3| false|
|  u4|  i4| false|
+----+----+------+

//Index user/ item columns
scala> val indexer1 = new StringIndexer().setInputCol("user").setOutputCol("userIndex")
indexer1: org.apache.spark.ml.feature.StringIndexer = strIdx_2de8d35b8301

scala> val indexed1 = indexer1.fit(df).transform(df)
indexed1: org.apache.spark.sql.DataFrame = [user: string, item: string ... 2 more fields]

scala> val indexer2 = new StringIndexer().setInputCol("item").setOutputCol("itemIndex")
indexer2: org.apache.spark.ml.feature.StringIndexer = strIdx_493ce45dbec3

scala> val indexed2 = indexer2.fit(indexed1).transform(indexed1)
indexed2: org.apache.spark.sql.DataFrame = [user: string, item: string ... 3 more fields]

scala> val tempDF = indexed2.withColumn("userIndex",indexed2("userIndex").cast("long")).withColumn("itemIndex",indexed2("itemIndex").cast("long")).withColumn("bought",indexed2("bought").cast("double")).select("userIndex","itemIndex","bought")
tempDF: org.apache.spark.sql.DataFrame = [userIndex: bigint, itemIndex: bigint ... 1 more field]

scala> tempDF.show
+---------+---------+------+
|userIndex|itemIndex|bought|
+---------+---------+------+
|        0|        1|   1.0|
|        0|        3|   1.0|
|        1|        0|   0.0|
|        1|        2|   0.0|
|        2|        1|   1.0|
|        2|        0|   1.0|
|        3|        0|   0.0|
|        3|        2|   0.0|
+---------+---------+------+

//Create coordinate matrix of size 4*4
scala> val corMat = new CoordinateMatrix(tempDF.rdd.map(m => MatrixEntry(m.getLong(0),m.getLong(1),m.getDouble(2))), 4, 4)
corMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@16be6b36

//Check the content of coordinate matrix
scala> corMat.entries.collect
res2: Array[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = Array(MatrixEntry(0,1,1.0), MatrixEntry(0,3,1.0), MatrixEntry(1,0,0.0), MatrixEntry(1,2,0.0), MatrixEntry(2,1,1.0), MatrixEntry(2,0,1.0), MatrixEntry(3,0,0.0), MatrixEntry(3,2,0.0))

希望,这会有所帮助!

【讨论】:

感谢您的回答 hadooper。

以上是关于从 org.apache.spark.sql.Dataset 转换为 CoordinateMatrix的主要内容,如果未能解决你的问题,请参考以下文章

如何从其他面板从 JTextField 获取输入

从PRISM开始学WPFMVVMViewModel?

在 python 中,为啥从数组读取比从列表读取慢?

从图库中挑选或从相机捕获的高质量图像

从PRISM开始学WPFMVVMCommand?

从PRISM开始学WPFPrism?