在 Spark 2.0 中访问向量列时出现 MatchError

Posted

技术标签:

【中文标题】在 Spark 2.0 中访问向量列时出现 MatchError【英文标题】:MatchError while accessing vector column in Spark 2.0 【发布时间】:2017-08-12 22:29:51 【问题描述】:

我正在尝试在 JSON 文件上创建 LDA 模型。

使用 JSON 文件创建 Spark 上下文:

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()

 val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")

显示df 应该显示DataFrame

display(df)

标记文本

import org.apache.spark.ml.feature.RegexTokenizer

// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
                .setPattern("[\\W_]+")
                .setMinTokenLength(4) // Filter away tokens with length < 4
                .setInputCol("text")
                .setOutputCol("tokens")

// Tokenize document
val tokenized_df = tokenizer.transform(df)

这应该显示tokenized_df

display(tokenized_df)

获取stopwords

%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords

可选:将停用词复制到 tmp 文件夹

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords

收集所有stopwords

val stopwords = sc.textFile("/tmp/stopwords").collect()

过滤掉stopwords

 import org.apache.spark.ml.feature.StopWordsRemover

 // Set params for StopWordsRemover
 val remover = new StopWordsRemover()
                   .setStopWords(stopwords) // This parameter is optional
                   .setInputCol("tokens")
                   .setOutputCol("filtered")

 // Create new DF with Stopwords removed
 val filtered_df = remover.transform(tokenized_df)

显示过滤后的df 应该验证stopwords 已被删除

 display(filtered_df)

向量化单词出现的频率

 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.sql.Row
 import org.apache.spark.ml.feature.CountVectorizer

 // Set params for CountVectorizer
 val vectorizer = new CountVectorizer()
               .setInputCol("filtered")
               .setOutputCol("features")
               .fit(filtered_df)

验证vectorizer

 vectorizer.transform(filtered_df)
           .select("id", "text","features","filtered").show()

在此之后,我发现在 LDA 中安装此 vectorizer 时出现问题。我认为CountVectorizer 的问题是给出稀疏向量,但 LDA 需要密集向量。仍在尝试找出问题所在。

这是地图无法转换的例外情况。

import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map  
             case Row(id: String, countVector: Vector) => (id, countVector) 
            
display(ldaDF)

例外:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

有一个 LDA 的工作示例,它没有引发任何问题

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.DistributedLDAModel, LDA

val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF

val ldaDF = df.map  case Row(id: Long, countVector: Vector) => (id, countVector)  

val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)

唯一的区别是第二个 sn-p 我们有一个密集矩阵。

【问题讨论】:

【参考方案1】:

这与稀疏无关。由于 Spark 2.0.0 ML Transformers 不再生成 o.a.s.mllib.linalg.VectorUDT 而是 o.a.s.ml.linalg.VectorUDT 并在本地映射到 o.a.s.ml.linalg.Vector 的子类。这些与旧的 MLLib API 不兼容,后者在 Spark 2.0.0 中将被弃用。

您可以使用Vectors.fromML 将其转换为“旧”:

import org.apache.spark.mllib.linalg.Vectors => OldVectors
import org.apache.spark.ml.linalg.Vectors => NewVectors

OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0))
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))

但如果您已经使用 ML 转换器,则使用 LDA 的 ML 实现更有意义。

为方便起见,您可以使用隐式转换:

import scala.languageFeature.implicitConversions

object VectorConversions 
  import org.apache.spark.mllib.linalg => mllib
  import org.apache.spark.ml.linalg => ml

  implicit def toNewVector(v: mllib.Vector) = v.asML
  implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)

【讨论】:

此外,与此类型不匹配相关的错误消息也非常令人困惑。例如Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(VecFunction)' due to data type mismatch: argument 1 requires vector type, however, 'VecFunction' is of vector type.; 请注意如何将参数和预期输入都称为向量类型。 不应该是org.apache.spark.mllib.*linalg*.Vectors.fromML 吗?哦,顺便说一句,这很有帮助;)【参考方案2】:

解决方案很简单,伙计们。在下面找到

//import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.linalg.Vector

【讨论】:

【参考方案3】:

我变了:

val ldaDF = countVectors.map  
             case Row(id: String, countVector: Vector) => (id, countVector) 
            

到:

val ldaDF = countVectors.map  case Row(docId: String, features: MLVector) => 
                               (docId.toLong, Vectors.fromML(features)) 

它就像一个魅力!它与@zero323 所写的一致。

进口清单:

import org.apache.spark.ml.feature.CountVectorizer, RegexTokenizer, StopWordsRemover
import org.apache.spark.ml.linalg.Vector => MLVector
import org.apache.spark.mllib.clustering.LDA, OnlineLDAOptimizer
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row, SparkSession

【讨论】:

以上是关于在 Spark 2.0 中访问向量列时出现 MatchError的主要内容,如果未能解决你的问题,请参考以下文章

OpenCV - 迭代向量时出现“无效参数”

尝试访问向量中对象中的字段时出现段错误

从逆序 C++ 访问向量时出现运行时错误 [关闭]

尝试访问二维向量元素时出现段错误

访问向量类成员时出现分段错误

在远程 Yarn 集群上使用 spark 从 S3 访问文件时出现问题