Flink WordCount

Posted noyouth

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink WordCount相关的知识,希望对你有一定的参考价值。

一 批处理

文件内容:

hello world
hello scala
hello flink

代码:

import org.apache.flink.api.scala._

object WordCount {

  def main(args: Array[String]): Unit = {

    //创建一个批处理的执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    val inputDataSet = env.readTextFile("D:\project\idea\FlinkTutorial\src\main\resources\hello.txt")

    val wordCountDataSet = inputDataSet
      .flatMap(_.split(" "))
      .map((_,1))
      .groupBy(0)//按下标为0的元素分组
      .sum(1)//对下标为1的元素求和

    wordCountDataSet.print()

  }

}

  

二 流处理

import org.apache.flink.streaming.api.scala._

object StreamWordCount {

  def main(args: Array[String]): Unit = {

    //创建一个流处理的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //接收socket数据流
    val textDataStream = env.socketTextStream("hadoop102", 7777)

    val wordCountDataStream = textDataStream
      .flatMap(_.split("\s"))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    wordCountDataStream.print()

    //执行任务
    env.execute("任务名")
  }

}

 

以上是关于Flink WordCount的主要内容,如果未能解决你的问题,请参考以下文章

Flink---wordcount

Flink---wordcount

Flink WordCount

04-flink-1.10.1-流处理WordCount代码里控制并行度

04-flink-1.10.1-流处理WordCount代码里控制并行度

05-flink-1.10.1-flink on yarn 流处理WordCount