02 使用Flink的本地模式完成词频统计

Posted alichengxuyuan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了02 使用Flink的本地模式完成词频统计相关的知识,希望对你有一定的参考价值。

前面我们已经安装了flink,与storm一样,flink也有两种模式,一是本地模式,主要用于学习和测试,另一个是集群模式,实际生产中使用这种模式。本节将阐述如何使用本地模式的flink进行词频统计。

1 系统、软件以及前提约束

2 操作

  • 1 在idea中创建一个maven项目
  • 2 修改该maven项目的pom.xml中的依赖
   <dependencies>
        <dependency>
            <!--spark依赖-->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <!--scala依赖-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-cdh5.7.0</version>
        </dependency>
        <!--hbase依赖-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.0-cdh6.0.1</version>
        </dependency>
        <!--storm依赖-->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
            <version>1.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
    </dependencies>
  • 3 在src/main/java中添加SocketWindowWordCountWithFlink.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCountWithFlink {
    public static void main(String[] args) throws Exception {
        // final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // local模式
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        DataStream<String> text = env.socketTextStream("localhost", 9999, "
");
        @SuppressWarnings("serial")
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                        for (String word : value.split("\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word").timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });
        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount(zyl_test)");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}
  • 4 测试
    (1)打开windows命令行,执行以下命令:
nc -l -p 9999

(2)在idea中执行SocketWindowWordCountWithFlink.java
(3)在nc窗口输入字符串,观察idea中的控制台,会有统计结果打印。
以上就是使用Flink的本地模式进行的词频统计过程,在本实验中,我们通过人输入字符串来模拟源源不断到来的数据流。

以上是关于02 使用Flink的本地模式完成词频统计的主要内容,如果未能解决你的问题,请参考以下文章

Flink学习笔记:Flink的最简安装

Flink学习笔记02:三种运行模式

Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计

Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计

使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响

使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响