2020/2/5寒假自学——学习进度报告12

Posted limitcm

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2020/2/5寒假自学——学习进度报告12相关的知识,希望对你有一定的参考价值。

  今天主要完成了北京市政百姓信件分析实战。


  Spark方面只是安装了Flume,以及尝试使用套接字流作为DSteam的数据源。

  启动NetCat作为套接字的监听模式,这样在端口9999就能和spark互联。

技术图片

 

   值得一提,nc -l 9999 虽然也是适用的,-k是为了可以保持多个连接,所以应该还是必要的。

  编写DSteam代码并作为接受数据的一方。

import findspark

findspark.init()

import sys

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    conf = SparkConf().set("spark.task.cpus", "2")
    sc = SparkContext(appName="PythonStreamingNetworkWordCount", master="spark://hadoop-master:7077", conf=conf)
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))         .map(lambda word: (word, 1))         .reduceByKey(lambda a, b: a + b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

  其中 lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 为使用了启动变量,所以在设置Parameters:

技术图片

 

  值得一提,我在代码里面加了 conf = SparkConf().set("spark.task.cpus", "2") 这么一句话,是因为在我第一次运行的时候发现程序卡在了中途的阶段,因为任务需求多个核心来处理但我默认只设置了1颗核心,所以加上这句话保险。但这样做的同时需要设置worker端的核心数量(因为我只有一个worker),只需要在spark-env.sh中加入export SPARK_WORKER_CORES=4 就可以了,核心数量视情况可以提升或减少,但为了任务正常运行需要至少2。

  运行结果:

  技术图片

 

   可以看到程序每秒都会监听消息。

  另外明天尝试Flume作为数据源并且把结果保存到文件系统中。

 

以上是关于2020/2/5寒假自学——学习进度报告12的主要内容,如果未能解决你的问题,请参考以下文章

2020/2/1寒假自学——学习进度报告8

2020/2/13寒假自学——学习进度报告16

2020/1/25寒假自学——学习进度报告6

2020/1/24寒假自学——学习进度报告5

2020/1/18寒假自学——学习进度报告3

2020/1/18寒假自学——学习进度报告4