Flink---wordcount
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---wordcount相关的知识,希望对你有一定的参考价值。
Flink处理的类型:DataStream,类似于SparkStreaming中的DStream
文章目录
一、导入依赖
<!--Java版-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<!--Scala版-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<!--连接kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<!--连接mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!--连接redis-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
二、编写代码
scala 版
import org.apache.flink.streaming.api.scala._
object WordCount
def main(args: Array[String]): Unit =
// 1. 配置环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度,可以不设置
// 2. Source: 内存、文件、kafka、Socket
val inputStream: DataStream[String] = env.socketTextStream("192.168.159.100",7777)
// 3. 转换操作
val res: DataStream[(String, Int)] = inputStream.flatMap(_.split("\\\\s+")).map(x => (x, 1))
.keyBy((0))
.sum(1)
// 4. sink
res.print()
// 5.执行
env.execute("WordCount")
Java版
package Flink.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Author shall潇
* @Date 2021/6/28
* @Description
*/
public class WordsCount
public static void main(String[] args)
// 1.配置环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 2.Source
DataStreamSource<String> inputDataStream = env.socketTextStream("192.168.159.100", 7777);
// 3.转换操作
SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputDataStream.flatMap(new myFlatMap())
.keyBy((0))
.sum(1);
result.print("单词统计:");
try
env.execute("java_wordcount");
catch (Exception e)
e.printStackTrace();
public static class myFlatMap implements FlatMapFunction<String, Tuple2<String,Integer>>
@Override
public void flatMap(String s, Collector<Tuple2<String,Integer>> collector) throws Exception
String[] words = s.split("\\\\s+");
for (String word : words)
collector.collect(new Tuple2<>(word,1));
以上是关于Flink---wordcount的主要内容,如果未能解决你的问题,请参考以下文章