spark ---词频统计

Posted luren-hometown

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark ---词频统计相关的知识,希望对你有一定的参考价值。

利用python来操作spark的词频统计,现将过程分享如下:

1.新建项目:(这里是在已有的项目中创建的,可单独创建wordcount项目)

①新建txt文件: wordcount.txt (文件内容: 跟词频统计(一)中文件一致)

②创建py文件: word.py

from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName(word).setMaster(local)
sc = SparkContext(conf=conf)
wordcount = sc.textFile(rE:Hbaseapiwordcount)
counts = wordcount.flatMap(lambda x: x.split(" "))             .map(lambda word: (word, 1))              .reduceByKey(lambda a, b: a + b).collect()
print(counts)

打印结果:

[(development, 1), (producing, 1), (among, 1), (Source,, 1), (for, 1), (quality, 1), (to, 1), (influencers, 1), (advances, 1), (collaborative, 1), (model, 1), (in, 1), (the, 2), (of, 1), (has, 1), (successful, 1), (Software, 1), ("Foundation‘s", 1), (most, 1), (long, 1), (that, 1), (uded, 1), (as, 1), (Open, 1), (The, 1), (commitment, 1), (software, 1), (consistently, 1), (a, 1), (development., 1), (high, 1), (future, 1), (Apache, 1), (served, 1), (open, 1), (https://s.apache.org/PIRA, 1)]

2.如果词频统计的数据量较小,可以如下:

from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName(word).setMaster(local)
sc = SparkContext(conf=conf)
data = [r"uded among the most successful influencers in Open Source, The Apache Software Foundation‘s       commitment to collaborative development has long served as a model for producing consistently       high quality software that advances the future of open development. https://s.apache.org/PIRA      "]
datardd = sc.parallelize(data)

result = datardd.flatMap(lambda x: x.split( )).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect()
print(result)

打印结果:

[(‘‘, 18), (development, 1), (producing, 1), (among, 1), (Source,, 1), (for, 1), (quality, 1), (to, 1), (influencers, 1), (served, 1), (collaborative, 1), (in, 1), (the, 2), (Open, 1), (of, 1), (has, 1), (long, 1), (https://s.apache.org/PIRA\
, 1), (successful, 1), (Software, 1), (most, 1), (consistently\
, 1), (a, 1), ("Foundation‘s\
", 1), (uded, 1), (as, 1), (advances, 1), (The, 1), (commitment, 1), (software, 1), (that, 1), (development., 1), (high, 1), (future, 1), (Apache, 1), (model, 1), (open, 1)]
18/07/27 17:14:34 INFO SparkContext: Invoking stop() from shutdown hook
result = datardd.flatMap(lambda x: x.split( )).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect()
print(result)

总结:

①在window上利用python操作spark词频统计前提: 本机要有spark的系统环境配置 和java的环境配置,配置步骤类似于python,必须确保安装无误才能运行结果.

②注意本机的python 跟spark的版本的兼容性,本机是python3.6 /spark1.6,很明显两者不兼容,需要重新安装3.5版本的python, linux上python跟spark也是同理.

③实际工作过程中需要注意:collect()的数据收集,在大数据处理过程中都是p量级的海量数据,如果不加思索直接collect()会直接导致内存崩溃.

? 针对③的情况,建议操作有:

from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName(word).setMaster(local)
sc = SparkContext(conf=conf)
data = [r"uded among the most successful influencers in Open Source, The Apache Software Foundation‘s       commitment to collaborative development has long served as a model for producing consistently       high quality software that advances the future of open development. https://s.apache.org/PIRA      "]
datardd = sc.parallelize(data)

# result = datardd.flatMap(lambda x: x.split(‘ ‘)).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect()
# print(result)
result = datardd.flatMap(lambda x: x.split( )).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
def f(x):
    print(x)

result2 = result.foreach(f)
print(result2)
解释:它是通过foreach()遍历循环将数据结果挨个挨个打印到后台,避免撑爆内存的风险!

 

以上是关于spark ---词频统计的主要内容,如果未能解决你的问题,请参考以下文章

spark提交参数解析

科普Spark,Spark是啥,如何使用Spark

Spark系列

Spark-01 spark简介

Spark官方文档: Spark Configuration(Spark配置)

Spark部署一spark安装