03-flink-1.10.1-流处理WordCount

Posted 逃跑的沙丁鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了03-flink-1.10.1-流处理WordCount相关的知识,希望对你有一定的参考价值。

目录

1 开启一个本地9999的TCP协议端口

2 编写第一个flink流处理代码

3  验证


 

1 开启一个本地9999的TCP协议端口

可以自己在linux 或者windows里安装nmap-ncat

由客户端主动发起连接,一旦连接必须由服务端发起关闭

[liucf@node1 ~]$ nc -l 9999

2 编写第一个flink流处理代码

package com.study.liucf.unbounded

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/8/16
 */
object WordCount {
  def main(args: Array[String]): Unit = {
    /**创建flink流式处理环境*/
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    /**监听一个流式输入端口*/
    val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
    /**数据流转换*/
    val res : DataStream[(String, Int)] = sockerInput.flatMap(_.split(" ")) //: DataStream[String]
      .filter(_.nonEmpty)//: DataStream[String]
      //.map(x=>(x,1))
      .map((_,1))//DataStream[(String, Int)]
      .keyBy(0)//KeyedStream[(String, Int), Tuple]
      .sum(1)//DataStream[(String, Int)]

    /**输出结果,打印到控制台*/
    res.print()

    /**启动流式处理*/
    env.execute(" liucf wordcoun")
  }

}

3  验证

输入单词

控制台输出结果

 

 

以上是关于03-flink-1.10.1-流处理WordCount的主要内容,如果未能解决你的问题,请参考以下文章

04-flink-1.10.1-流处理WordCount代码里控制并行度

04-flink-1.10.1-流处理WordCount代码里控制并行度

Hadoop的word co-occurrence实现

spark实现wordcount

IO流06_处理流

流处理器和CUDA