使用 Spark 和 Scala 进行字数统计

Posted

技术标签:

【中文标题】使用 Spark 和 Scala 进行字数统计【英文标题】:Word count using Spark and Scala 【发布时间】:2015-06-09 13:59:09 【问题描述】:

我必须在 Scala 中编写一个程序,使用 spark 计算一个单词在文本中出现的次数,但使用 RDD,我的变量计数总是在末尾显示 0。你能帮我吗? 这是我的代码

import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object wordcount 
    def main(args: Array[String]) 
      // set spark context
      val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
      val sc = new SparkContext(conf)

      val distFile = sc.textFile("bible.txt")

      print("Enter word to loook for in the HOLY BILE: ")
      val word = Console.readLine
      var count = 0;
      println("You entered " + word)

      for (bib <- distFile.flatMap(_.split(" "))) 

        if (word==bib) 
            count += 1

        

        
      println(word + " occours " + count + " times in the HOLY BIBLE!")
    

【问题讨论】:

【参考方案1】:

我建议您在 RDD 中使用可用的转换而不是您自己的程序(尽管它没有害处)来获得所需的结果,例如可以使用以下代码来检索字数。

val word = Console.readLine
println("You entered " + word)
val input = sc.textFile("bible.txt")
val splitedLines = input.flatMap(line => line.split(" "))
                    .filter(x => x.equals(word))

System.out.println(splitedLines.count())

请参阅link 了解有关 Spark 内部的更多信息。

【讨论】:

投反对票只是因为 for 块将导致 foreach 因此声明没有调用任何操作是不正确的。 @Justin Pihony,我看你是对的,所以我删除了关于 RDD 操作的 cmets。感谢您指出这一点。 我还有一个问题。我制作了程序的 JAR,我想在谷歌云平台上的 spark over hadoop 中运行它,但是当我在 spark stanalone 中本地播放它时,我得到 ClassNotFound 的错误,即使它工作。你能帮我吗?谢谢【参考方案2】:

问题是您在分布式集合上使用了可变变量。这在正常情况下很难控制,尤其是在 Spark 中,变量是单独复制到每个工作人员的。因此,他们最终得到了自己的 count 变量版本,而原始版本实际上从未更新。您需要使用accumulator,它只保证用于操作。综上所述,您可以在没有变量或累加器的情况下完成此操作:

val splitData = distFile.flatMap(_.split(" "))
val finalCount = splitData.aggregate(0)(
  (accum, word) => if(word == bib) accum + 1 else accum,
  _ + _)

这样做是首先将计数播种为 0。然后,第一个操作将在每个分区上运行。 accum 是累计计数,word 是当前要比较的字。第二个操作只是用于将所有分区的counts 相加的组合器。

【讨论】:

我必须把代码放在哪里?里面为了?顺便说一句谢谢你的回答男人 你会完全摆脱 for 并把它放在它的位置。【参考方案3】:

我认为迭代:bib &lt;- distFile.flatMap(_.split(" ")) 行不通,因为您的数据在 RDD 中,请尝试像这样进行收集:

for (bib&lt;-distFile.flatMap(_.split(" ")).collect).

(仅在您的数据不大的情况下才有效,您可以对其进行收集)

否则,如果您的数据集很大,您可以这样做:

val distFile = sc.textFile("bible.txt")
val word = Console.readLine
val count = distFile.flatMap(_.split(" ")).filter(l=>l==word).count
println(word + " occours " + count + " times in the HOLY BIBLE!")

【讨论】:

如果数据太大而无法收集到驱动程序上怎么办?从大数据的角度来看,这并不能解决这个问题。它只是把它变成一个单线程、一台机器的问题。【参考方案4】:
val textFile = sc.textFile("demoscala.txt")
val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("WordCountSpark")  

如果有人对 (_) 感到困惑。下面的好博客

http://www.codecommit.com/blog/scala/quick-explanation-of-scalas-syntax

【讨论】:

【参考方案5】:
val text=sc.textfile("filename.txt")

val counts=text.flatmap(line=>line.split("")).map(word=>(word,1)).reduceByKey(_+_) counts.collect

【讨论】:

嗨...解释你的解决方案会很有帮助

以上是关于使用 Spark 和 Scala 进行字数统计的主要内容,如果未能解决你的问题,请参考以下文章

Spark中的treeReduce与reduceByKey

使用 SQL 确定文本字段的字数统计

Countvectorize 统计大文件中的字数

火花模式rdd到RDD

如何让Pages文稿显示字数统计?

使用Python读取markdown文件并统计字数