spark ---词频统计
Posted luren-hometown
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark ---词频统计相关的知识,希望对你有一定的参考价值。
1.新建项目:(这里是在已有的项目中创建的,可单独创建wordcount项目)
①新建txt文件: wordcount.txt (文件内容: 跟词频统计(一)中文件一致)
②创建py文件: word.py
from pyspark import SparkContext from pyspark import SparkConf conf = SparkConf().setAppName(‘word‘).setMaster(‘local‘) sc = SparkContext(conf=conf) wordcount = sc.textFile(r‘E:Hbaseapiwordcount‘) counts = wordcount.flatMap(lambda x: x.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a + b).collect() print(counts)
打印结果:
[(‘development‘, 1), (‘producing‘, 1), (‘among‘, 1), (‘Source,‘, 1), (‘for‘, 1), (‘quality‘, 1), (‘to‘, 1), (‘influencers‘, 1), (‘advances‘, 1), (‘collaborative‘, 1), (‘model‘, 1), (‘in‘, 1), (‘the‘, 2), (‘of‘, 1), (‘has‘, 1), (‘successful‘, 1), (‘Software‘, 1), ("Foundation‘s", 1), (‘most‘, 1), (‘long‘, 1), (‘that‘, 1), (‘uded‘, 1), (‘as‘, 1), (‘Open‘, 1), (‘The‘, 1), (‘commitment‘, 1), (‘software‘, 1), (‘consistently‘, 1), (‘a‘, 1), (‘development.‘, 1), (‘high‘, 1), (‘future‘, 1), (‘Apache‘, 1), (‘served‘, 1), (‘open‘, 1), (‘https://s.apache.org/PIRA‘, 1)]
2.如果词频统计的数据量较小,可以如下:
from pyspark import SparkContext from pyspark import SparkConf conf = SparkConf().setAppName(‘word‘).setMaster(‘local‘) sc = SparkContext(conf=conf) data = [r"uded among the most successful influencers in Open Source, The Apache Software Foundation‘s commitment to collaborative development has long served as a model for producing consistently high quality software that advances the future of open development. https://s.apache.org/PIRA "] datardd = sc.parallelize(data) result = datardd.flatMap(lambda x: x.split(‘ ‘)).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect() print(result)
打印结果:
[(‘‘, 18), (‘development‘, 1), (‘producing‘, 1), (‘among‘, 1), (‘Source,‘, 1), (‘for‘, 1), (‘quality‘, 1), (‘to‘, 1), (‘influencers‘, 1), (‘served‘, 1), (‘collaborative‘, 1), (‘in‘, 1), (‘the‘, 2), (‘Open‘, 1), (‘of‘, 1), (‘has‘, 1), (‘long‘, 1), (‘https://s.apache.org/PIRA\ ‘, 1), (‘successful‘, 1), (‘Software‘, 1), (‘most‘, 1), (‘consistently\ ‘, 1), (‘a‘, 1), ("Foundation‘s\ ", 1), (‘uded‘, 1), (‘as‘, 1), (‘advances‘, 1), (‘The‘, 1), (‘commitment‘, 1), (‘software‘, 1), (‘that‘, 1), (‘development.‘, 1), (‘high‘, 1), (‘future‘, 1), (‘Apache‘, 1), (‘model‘, 1), (‘open‘, 1)] 18/07/27 17:14:34 INFO SparkContext: Invoking stop() from shutdown hook result = datardd.flatMap(lambda x: x.split(‘ ‘)).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect() print(result)
①在window上利用python操作spark词频统计前提: 本机要有spark的系统环境配置 和java的环境配置,配置步骤类似于python,必须确保安装无误才能运行结果.
②注意本机的python 跟spark的版本的兼容性,本机是python3.6 /spark1.6,很明显两者不兼容,需要重新安装3.5版本的python, linux上python跟spark也是同理.
③实际工作过程中需要注意:collect()的数据收集,在大数据处理过程中都是p量级的海量数据,如果不加思索直接collect()会直接导致内存崩溃.
? 针对③的情况,建议操作有:
from pyspark import SparkContext from pyspark import SparkConf conf = SparkConf().setAppName(‘word‘).setMaster(‘local‘) sc = SparkContext(conf=conf) data = [r"uded among the most successful influencers in Open Source, The Apache Software Foundation‘s commitment to collaborative development has long served as a model for producing consistently high quality software that advances the future of open development. https://s.apache.org/PIRA "] datardd = sc.parallelize(data) # result = datardd.flatMap(lambda x: x.split(‘ ‘)).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect() # print(result) result = datardd.flatMap(lambda x: x.split(‘ ‘)).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b) def f(x): print(x) result2 = result.foreach(f) print(result2)
解释:它是通过foreach()遍历循环将数据结果挨个挨个打印到后台,避免撑爆内存的风险!
以上是关于spark ---词频统计的主要内容,如果未能解决你的问题,请参考以下文章