Spark学习笔记——文本处理技术
Posted tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark学习笔记——文本处理技术相关的知识,希望对你有一定的参考价值。
1.建立TF-IDF模型
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg.{SparseVector => SV} import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF /** * Created by common on 17-5-6. */ object TFIDF { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCount").setMaster("local") val sc = new SparkContext(conf) // val path = "hdfs://master:9000/user/common/20Newsgroups/20news-bydate-train/*" val path = "file:///media/common/工作/kaggle/test/*" val rdd = sc.wholeTextFiles(path) // 提取文本信息 val text = rdd.map { case (file, text) => text } // print(text.count()) val regex = """[^0-9]*""".r // 排除停用词 val stopwords = Set( "the", "a", "an", "of", "or", "in", "for", "by", "on", "but", "is", "not", "with", "as", "was", "if", "they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to" ) // 以使用正则表达切分原始文档来移除这些非单词字符 val nonWordSplit = text.flatMap(t => t.split("""\W+""").map(_.toLowerCase)) // 过滤掉数字和包含数字的单词 val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches) // 基于出现的频率,排除很少出现的单词,需要先计算一遍整个测试集 val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _) val rareTokens = tokenCounts.filter { case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet // 每一个文档的预处理函数 def tokenize(line: String): Seq[String] = { line.split("""\W+""") .map(_.toLowerCase) .filter(token => regex.pattern.matcher(token).matches) .filterNot(token => stopwords.contains(token)) .filterNot(token => rareTokens.contains(token)) .filter(token => token.size >= 2) //删除只有一个字母的单词 .toSeq } // 每一篇文档经过预处理之后,每一个文档成为一个Seq[String] val tokens = text.map(doc => tokenize(doc)).cache() println(tokens.distinct.count) // 第一篇文档第一部分分词之后的结果 println(tokens.first()) println(tokens.first().length) // 生成2^18维的特征 val dim = math.pow(2, 18).toInt val hashingTF = new HashingTF(dim) // HashingTF 的 transform 函数把每个输入文档(即词项的序列)映射到一个MLlib的Vector对象 val tf = hashingTF.transform(tokens) // tf的长度是文档的个数,对应的是文档和维度的矩阵 tf.cache // 取得第一个文档的向量 val v = tf.first.asInstanceOf[SV] println(v.size) // v.value和v.indices的长度相等,value是词频,indices是词频非零的下标 println(v.values.size) println(v.indices.size) println(v.values.toSeq) println(v.indices.take(10).toSeq) // 对每个单词计算逆向文本频率 val idf = new IDF().fit(tf) // 转换词频向量为TF-IDF向量 val tfidf = idf.transform(tf) val v2 = tfidf.first.asInstanceOf[SV] println(v2.values.size) println(v2.values.take(10).toSeq) println(v2.indices.take(10).toSeq) // 计算整个文档的TF-IDF最小和最大权值 val minMaxVals = tfidf.map { v => val sv = v.asInstanceOf[SV] (sv.values.min, sv.values.max) } val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) => (math.min(min1, min2), math.max(max1, max2)) } println(globalMinMax) // 比较几个单词的TF-IDF权值 val common = sc.parallelize(Seq(Seq("you", "do", "we"))) val tfCommon = hashingTF.transform(common) val tfidfCommon = idf.transform(tfCommon) val commonVector = tfidfCommon.first.asInstanceOf[SV] println(commonVector.values.toSeq) val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation","investment"))) val tfUncommon = hashingTF.transform(uncommon) val tfidfUncommon = idf.transform(tfUncommon) val uncommonVector = tfidfUncommon.first.asInstanceOf[SV] println(uncommonVector.values.toSeq) } }
以上是关于Spark学习笔记——文本处理技术的主要内容,如果未能解决你的问题,请参考以下文章
Spark StreamingSpark Day10:Spark Streaming 学习笔记