Flink---wordcount

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---wordcount相关的知识,希望对你有一定的参考价值。

Flink处理的类型:DataStream,类似于SparkStreaming中的DStream

文章目录

一、导入依赖

<!--Java版-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.10.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.10.1</version>
</dependency>
<!--Scala版-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.11</artifactId>
  <version>1.10.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.10.1</version>
</dependency>

<!--连接kafka-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  <version>1.10.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  <version>1.10.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.10.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.10.1</version>
</dependency>
<!--连接mysql-->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>
<!--连接redis-->
<dependency>
  <groupId>org.apache.bahir</groupId>
  <artifactId>flink-connector-redis_2.11</artifactId>
  <version>1.0</version>
</dependency>

二、编写代码

scala 版

import org.apache.flink.streaming.api.scala._

object WordCount 
  def main(args: Array[String]): Unit = 
    // 1. 配置环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)      // 设置并行度,可以不设置

    // 2. Source: 内存、文件、kafka、Socket
    val inputStream: DataStream[String] = env.socketTextStream("192.168.159.100",7777)

    // 3. 转换操作
    val res: DataStream[(String, Int)] = inputStream.flatMap(_.split("\\\\s+")).map(x => (x, 1))
      .keyBy((0))
      .sum(1)

    // 4. sink
    res.print()
    
    // 5.执行
    env.execute("WordCount")
  

Java版

package Flink.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


/**
 * @Author shall潇
 * @Date 2021/6/28
 * @Description
 */
public class WordsCount 
    public static void main(String[] args) 
    	// 1.配置环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
		// 2.Source
        DataStreamSource<String> inputDataStream = env.socketTextStream("192.168.159.100", 7777);
		// 3.转换操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputDataStream.flatMap(new myFlatMap())
                .keyBy((0))
                .sum(1);

        result.print("单词统计:");

        try 
            env.execute("java_wordcount");
         catch (Exception e) 
            e.printStackTrace();
        
    

    public static class myFlatMap implements FlatMapFunction<String, Tuple2<String,Integer>> 
        @Override
        public void flatMap(String s, Collector<Tuple2<String,Integer>> collector) throws Exception 
            String[] words = s.split("\\\\s+");
            for (String word : words) 
                collector.collect(new Tuple2<>(word,1));
            
        
    

以上是关于Flink---wordcount的主要内容,如果未能解决你的问题,请参考以下文章

Flink WordCount

使用Java lambda表达式实现Flink WordCount