spark wordcount案例
Posted mengbin0546
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark wordcount案例相关的知识,希望对你有一定的参考价值。
案例
POM文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.atguigu</groupId> <artifactId>Streaming</artifactId> <version>1.0-SNAPSHOT</version> <properties> <scala.version>2.11.8</scala.version> <spark.version>2.1.1</spark.version> </properties> <dependencies> <!--添加scala依赖--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <!--如果有provide存在,那么打包的时候该依赖不会打到jar包中.--> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <!--添加streaming的依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <!--添加编译的支持--> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!--添加打jar包的支持工具--> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!--打包用的--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.atguigu.streaming.StreamingWordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> </project>
文件
package com.atguigu.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingWordCount { val sparkconf =new SparkConf().setAppName("StreamingWordCount").setMaster("local[4]") val ssc =new StreamingContext(sparkconf,Seconds(1)) //从master01机器上的9099端口吧剬的获取输入的是文本数据 val line =ssc.socketTextStream("hadoop102",9999) val words =line.flatMap(_.split(" ")) //将每一个单词转换成一个元组 val pairs = words.map((_,1)) //根据单词来统计相同单词出现的次数 val result =pairs.reduceByKey(_+_) result.print() //启动 ssc.start() //等待你的停止信号 ssc.awaitTermination() }
打成包上穿到linux
测试运行之前linunx环境必须有netcat
yum install nc.x86_64 OK,安装正常。但有的人可能会遇到:protocol not available这个提示 解决方法: 在根目录[root@izwz9cjwo2hniwcgi204gez ~]下依次输入下面三条命令 1、yum erase nc 2、wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm 3、rpm -iUv nc-1.84-22.el6.x86_64.rpm wget http://vault.centos.org/6.6/os/i386/Packages/nc-1.84-22.el6.i686.rpm
启动 nc
nc -lk 9999
启动程序
bin/spark-submit --class com.atguigu.streaming.StreamingWordCount bin/Streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
以上是关于spark wordcount案例的主要内容,如果未能解决你的问题,请参考以下文章