02 使用Flink的本地模式完成词频统计
Posted alichengxuyuan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了02 使用Flink的本地模式完成词频统计相关的知识,希望对你有一定的参考价值。
前面我们已经安装了flink,与storm一样,flink也有两种模式,一是本地模式,主要用于学习和测试,另一个是集群模式,实际生产中使用这种模式。本节将阐述如何使用本地模式的flink进行词频统计。
1 系统、软件以及前提约束
- CentOS 7 64 工作站 作者的机子ip是192.168.100.200,请读者根据自己实际情况设置
- idea 2018.1
- 在Win10中安装nc
https://www.jianshu.com/p/4f6fb8834ad9
2 操作
- 1 在idea中创建一个maven项目
- 2 修改该maven项目的pom.xml中的依赖
<dependencies>
<dependency>
<!--spark依赖-->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!--scala依赖-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!--hadoop依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.7.0</version>
</dependency>
<!--hbase依赖-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.0-cdh6.0.1</version>
</dependency>
<!--storm依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
<version>1.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.1</version>
</dependency>
</dependencies>
- 3 在src/main/java中添加SocketWindowWordCountWithFlink.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCountWithFlink {
public static void main(String[] args) throws Exception {
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// local模式
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999, "
");
@SuppressWarnings("serial")
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
for (String word : value.split("\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word").timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word, a.count + b.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount(zyl_test)");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
- 4 测试
(1)打开windows命令行,执行以下命令:
nc -l -p 9999
(2)在idea中执行SocketWindowWordCountWithFlink.java
(3)在nc窗口输入字符串,观察idea中的控制台,会有统计结果打印。
以上就是使用Flink的本地模式进行的词频统计过程,在本实验中,我们通过人输入字符串来模拟源源不断到来的数据流。
以上是关于02 使用Flink的本地模式完成词频统计的主要内容,如果未能解决你的问题,请参考以下文章
Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计
Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计