Spark + Scala: NaiveBayes.train - 异常是 java.util.NoSuchElementException: next on empty iterator
Posted
技术标签:
【中文标题】Spark + Scala: NaiveBayes.train - 异常是 java.util.NoSuchElementException: next on empty iterator【英文标题】:Spark + Scala: NaiveBayes.train - exception is java.util.NoSuchElementException: next on empty iterator 【发布时间】:2017-07-11 14:02:23 【问题描述】:我正在尝试使用带有 Spark MLlib 的推文进行情绪分析。在预处理数据并将其转换为适当的格式后,我调用 NaiveBayes 的 train 方法来获取模型,但它失败并出现异常。这是堆栈跟踪:
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
at scala.collection.IterableLike$class.head(IterableLike.scala:91)
at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:108)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:120)
at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:108)
at org.apache.spark.mllib.classification.NaiveBayes.run(NaiveBayes.scala:408)
at org.apache.spark.mllib.classification.NaiveBayes$.train(NaiveBayes.scala:467)
at org.jc.sparknaivebayes.main.NaiveBayesTrain$delayedInit$body.apply(NaiveBayesTrain.scala:53)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at org.jc.sparknaivebayes.main.NaiveBayesTrain$.main(NaiveBayesTrain.scala:12)
at org.jc.sparknaivebayes.main.NaiveBayesTrain.main(NaiveBayesTrain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
这是我的主要方法:
val csvFiles = args(0).split(",")
val modelStore = args(1)
val docs = TweetParser.parseAll(csvFiles, sc)
val termDocs = Tokenizer.tokenizeAll(docs)
val termDocsRdd = sc.parallelize[TermDoc](termDocs.toSeq)
val numDocs = termDocsRdd.count()
//val terms = termDocsRdd.flatMap(_.terms).distinct().collect().sortBy(identity)
val terms = termDocsRdd.flatMap(_.terms).distinct().sortBy(identity)
val termDict = new Dictionary(terms)
//val labels = termDocsRdd.flatMap(_.labels).distinct().collect()
val labels = termDocsRdd.flatMap(_.labels).distinct()
val labelDict = new Dictionary(labels)
val idfs = (termDocsRdd.flatMap(termDoc => termDoc.terms.map((termDoc.doc, _))).distinct().groupBy(_._2) collect
case (term, docs) if docs.size > 3 =>
term -> (numDocs.toDouble / docs.size.toDouble)
).collect.toMap
val tfidfs = termDocsRdd flatMap
termDoc =>
val termPairs: Seq[(Int, Double)] = termDict.tfIdfs(termDoc.terms, idfs)
termDoc.labels.headOption.map
label =>
val labelId = labelDict.indexOf(label).toDouble
val vector = Vectors.sparse(termDict.count.toInt, termPairs)
LabeledPoint(labelId, vector)
val model = NaiveBayes.train(tfidfs)
字典类在这里:
class Dictionary(dict: RDD[String]) extends Serializable
//val builder = ImmutableBiMap.builder[String, Long]()
//dict.zipWithIndex.foreach(e => builder.put(e._1, e._2))
//val termToIndex = builder.build()
val termToIndex = dict.zipWithIndex()
//@transient
//lazy val indexToTerm = termToIndex.inverse()
lazy val indexToTerm = dict.zipWithIndex().map
case (k, v) => (v, k)
//converts from (a, 0),(b, 1),(c, 2) to (0, a),(1, b),(2, c)
val count = termToIndex.count().toInt
def indexOf(term: String): Int = termToIndex.lookup(term).headOption.getOrElse[Long](-1).toInt
def valueOf(index: Int): String = indexToTerm.lookup(index).headOption.getOrElse("")
def tfIdfs (terms: Seq[String], idfs: Map[String, Double]): Seq[(Int, Double)] =
val filteredTerms = terms.filter(idfs contains)
(filteredTerms.groupBy(identity).map
case (term, instances) =>
val indexOfTerm: Int = indexOf(term)
if (indexOfTerm < 0) (-1, 0.0) else (indexOf(term), (instances.size.toDouble / filteredTerms.size.toDouble) * idfs(term))
).filter(p => p._1.toInt >= 0).toSeq.sortBy(_._1)
def vectorize(tfIdfs: Iterable[(Int, Double)]) =
Vectors.sparse(dict.count().toInt, tfIdfs.toSeq)
文档类如下所示:
case class Document(docId: String, body: String = "", labels: Set[String] = Set.empty)
TermDoc 类:
case class TermDoc(doc: String, labels: Set[String], terms: Seq[String])
我被困在这一步,我真的需要完成这项工作,但我很难找到有关它的有用信息。提前致谢。
P.S:这是基于 chimpler 的博客:https://github.com/chimpler/blog-spark-naive-bayes-reuters/blob/master/src/main/scala/com/chimpler/sparknaivebayesreuters/NaiveBayes.scala
更新:CSV 解析器和文档生成器的新代码。
import org.apache.spark.SparkContext
import scala.io.Source
/**
* Created by cespedjo on 14/02/2017.
*/
object TweetParser extends Serializable
val headerPart = "polarity"
val mentionRegex = """@(.)+?\s""".r
val fullRegex = """(\d+),(.+?),(N|P|NEU|NONE)(,\w+|;\w+)*""".r
def parseAll(csvFiles: Iterable[String], sc: SparkContext) = csvFiles flatMap(csv => parse(csv, sc))
def parse(csvFile: String, sc: SparkContext) =
val csv = sc.textFile(csvFile)
val docs = scala.collection.mutable.ArrayBuffer.empty[Document]
csv.foreach(
line => if (!line.contains(headerPart)) docs += buildDocument(line)
)
docs
//docs.filter(!_.docId.equals("INVALID"))
def buildDocument(line: String): Document =
val fullRegex(id, txt, snt, opt) = line
if (id != null && txt != null && snt != null)
new Document(id, mentionRegex.replaceAllIn(txt, ""), Set(snt))
else
new Document("INVALID")
case class Document(docId: String, body: String = "", labels: Set[String] = Set.empty)
【问题讨论】:
我认为您的错误来自val vector = Vectors.sparse
中的空向量,您需要找到/发布所有错误消息,这些错误消息指向您的应用程序中损坏的代码,以便您可以确定,我有类似的问题并通过将更多数据推送到向量来解决,顺便说一句,可能会查找 sparse vector
类,并在其上应用操作以获取更多详细信息
感谢您的评论 Karol,我是 spark 和 scala 的新手,您能否详细说明一下您的建议?我无法理解“将更多数据推送到向量”部分,因为我相信它是由 RDD 中已经包含的数据填充的,所以缺少多少数据?
顺便说一句,我去了 Vector 的文档,上面写着 LOCAL vector... 这是否意味着它不能在分布式模式下使用?在分布式模式下运行时,我需要使用什么进行监督学习?
我使用 ML 管道,利用交叉验证器和参数转换功能:https://databricks.com/blog/2015/10/20/audience-modeling-with-apache-spark-ml-pipelines.html
、http://spark.apache.org/docs/latest/ml-pipeline.html
、重新向量:在对数据应用模型/函数之前查看向量中的数据, 错误说没有数据/缺少一些必需的数据
@KarolSudol 任何想法为什么问题中的代码更新可能会产生空 RDD?我已经在 CSV 文件的几行中测试了代码,它可以识别指定的模式,但是,没有文档附加到可变数组。
【参考方案1】:
我认为问题在于某些文档不包含任何术语对。您不能在空数据点上进行训练。尝试将您的代码更改为:
val tfidfs = termDocsRdd flatMap
termDoc =>
val termPairs: Seq[(Int, Double)] = termDict.tfIdfs(termDoc.terms, idfs)
if (termPairs.nonEmpty)
termDoc.labels.headOption.map
label =>
val labelId = labelDict.indexOf(label).toDouble
val vector = Vectors.sparse(termDict.count.toInt, termPairs)
LabeledPoint(labelId, vector)
else
None
【讨论】:
感谢您的回答帕斯卡,任何想法为什么问题中的代码更新可能会产生空 RDD?我已经在 CSV 文件的几行中测试了代码,它可以识别指定的模式,但是,没有文档附加到可变数组。 您似乎在某些地方更改了代码(注释的内容)..您正在查找未收集的 RDD 字典,这是错误的,因为此时代码中,您需要拥有“大局”(即:您不想在部分工作人员字典中进行查找,而是在全局字典中进行查找。因此,带有 .collect() 的前代码对我来说似乎是正确的,但不是较新的代码 我明白......但我想找到一种不使用收集的正确方法来实现这一点,因为我在某处读到这不是一个好的选择,因为它强制在驱动程序中收集数据程序,在处理大量数据时可能会导致错误......有什么建议吗?顺便说一句,我提到的行为发生在执行这行代码时:val docs = TweetParser.parseAll(csvFiles, sc)。我用一个文件进行了测试,docs.size 为 0。我不知道为什么会发生这种情况,即使在单独的行上测试模式有效。以上是关于Spark + Scala: NaiveBayes.train - 异常是 java.util.NoSuchElementException: next on empty iterator的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 中获取 spark.ml NaiveBayes 概率向量而不是 [0-1] 类?
无法在 NaiveBayes Spark 示例上将字符串转换为浮点数
spark.mllib源码阅读-分类算法2-NaiveBayes