Flink WordCount
Posted noyouth
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink WordCount相关的知识,希望对你有一定的参考价值。
一 批处理
文件内容:
hello world hello scala hello flink
代码:
import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]): Unit = { //创建一个批处理的执行环境 val env = ExecutionEnvironment.getExecutionEnvironment val inputDataSet = env.readTextFile("D:\project\idea\FlinkTutorial\src\main\resources\hello.txt") val wordCountDataSet = inputDataSet .flatMap(_.split(" ")) .map((_,1)) .groupBy(0)//按下标为0的元素分组 .sum(1)//对下标为1的元素求和 wordCountDataSet.print() } }
二 流处理
import org.apache.flink.streaming.api.scala._ object StreamWordCount { def main(args: Array[String]): Unit = { //创建一个流处理的执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //接收socket数据流 val textDataStream = env.socketTextStream("hadoop102", 7777) val wordCountDataStream = textDataStream .flatMap(_.split("\s")) .map((_, 1)) .keyBy(0) .sum(1) wordCountDataStream.print() //执行任务 env.execute("任务名") } }
以上是关于Flink WordCount的主要内容,如果未能解决你的问题,请参考以下文章
04-flink-1.10.1-流处理WordCount代码里控制并行度