使用 Spark 和 Scala 进行字数统计
Posted
技术标签:
【中文标题】使用 Spark 和 Scala 进行字数统计【英文标题】:Word count using Spark and Scala 【发布时间】:2015-06-09 13:59:09 【问题描述】:我必须在 Scala 中编写一个程序,使用 spark 计算一个单词在文本中出现的次数,但使用 RDD,我的变量计数总是在末尾显示 0。你能帮我吗? 这是我的代码
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object wordcount
def main(args: Array[String])
// set spark context
val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
val sc = new SparkContext(conf)
val distFile = sc.textFile("bible.txt")
print("Enter word to loook for in the HOLY BILE: ")
val word = Console.readLine
var count = 0;
println("You entered " + word)
for (bib <- distFile.flatMap(_.split(" ")))
if (word==bib)
count += 1
println(word + " occours " + count + " times in the HOLY BIBLE!")
【问题讨论】:
【参考方案1】:我建议您在 RDD 中使用可用的转换而不是您自己的程序(尽管它没有害处)来获得所需的结果,例如可以使用以下代码来检索字数。
val word = Console.readLine
println("You entered " + word)
val input = sc.textFile("bible.txt")
val splitedLines = input.flatMap(line => line.split(" "))
.filter(x => x.equals(word))
System.out.println(splitedLines.count())
请参阅link 了解有关 Spark 内部的更多信息。
【讨论】:
投反对票只是因为 for 块将导致foreach
因此声明没有调用任何操作是不正确的。
@Justin Pihony,我看你是对的,所以我删除了关于 RDD 操作的 cmets。感谢您指出这一点。
我还有一个问题。我制作了程序的 JAR,我想在谷歌云平台上的 spark over hadoop 中运行它,但是当我在 spark stanalone 中本地播放它时,我得到 ClassNotFound 的错误,即使它工作。你能帮我吗?谢谢【参考方案2】:
问题是您在分布式集合上使用了可变变量。这在正常情况下很难控制,尤其是在 Spark 中,变量是单独复制到每个工作人员的。因此,他们最终得到了自己的 count
变量版本,而原始版本实际上从未更新。您需要使用accumulator
,它只保证用于操作。综上所述,您可以在没有变量或累加器的情况下完成此操作:
val splitData = distFile.flatMap(_.split(" "))
val finalCount = splitData.aggregate(0)(
(accum, word) => if(word == bib) accum + 1 else accum,
_ + _)
这样做是首先将计数播种为 0。然后,第一个操作将在每个分区上运行。 accum
是累计计数,word
是当前要比较的字。第二个操作只是用于将所有分区的count
s 相加的组合器。
【讨论】:
我必须把代码放在哪里?里面为了?顺便说一句谢谢你的回答男人 你会完全摆脱 for 并把它放在它的位置。【参考方案3】:我认为迭代:bib <- distFile.flatMap(_.split(" "))
行不通,因为您的数据在 RDD 中,请尝试像这样进行收集:
for (bib<-distFile.flatMap(_.split(" ")).collect)
.
(仅在您的数据不大的情况下才有效,您可以对其进行收集)
否则,如果您的数据集很大,您可以这样做:
val distFile = sc.textFile("bible.txt")
val word = Console.readLine
val count = distFile.flatMap(_.split(" ")).filter(l=>l==word).count
println(word + " occours " + count + " times in the HOLY BIBLE!")
【讨论】:
如果数据太大而无法收集到驱动程序上怎么办?从大数据的角度来看,这并不能解决这个问题。它只是把它变成一个单线程、一台机器的问题。【参考方案4】:val textFile = sc.textFile("demoscala.txt")
val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("WordCountSpark")
如果有人对 (_) 感到困惑。下面的好博客
http://www.codecommit.com/blog/scala/quick-explanation-of-scalas-syntax
【讨论】:
【参考方案5】:val text=sc.textfile("filename.txt")
val counts=text.flatmap(line=>line.split("")).map(word=>(word,1)).reduceByKey(_+_) counts.collect
【讨论】:
嗨...解释你的解决方案会很有帮助以上是关于使用 Spark 和 Scala 进行字数统计的主要内容,如果未能解决你的问题,请参考以下文章