03-flink-1.10.1-流处理WordCount
Posted 逃跑的沙丁鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了03-flink-1.10.1-流处理WordCount相关的知识,希望对你有一定的参考价值。
目录
1 开启一个本地9999的TCP协议端口
可以自己在linux 或者windows里安装nmap-ncat
由客户端主动发起连接,一旦连接必须由服务端发起关闭
[liucf@node1 ~]$ nc -l 9999
2 编写第一个flink流处理代码
package com.study.liucf.unbounded
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/8/16
*/
object WordCount {
def main(args: Array[String]): Unit = {
/**创建flink流式处理环境*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
/**监听一个流式输入端口*/
val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
/**数据流转换*/
val res : DataStream[(String, Int)] = sockerInput.flatMap(_.split(" ")) //: DataStream[String]
.filter(_.nonEmpty)//: DataStream[String]
//.map(x=>(x,1))
.map((_,1))//DataStream[(String, Int)]
.keyBy(0)//KeyedStream[(String, Int), Tuple]
.sum(1)//DataStream[(String, Int)]
/**输出结果,打印到控制台*/
res.print()
/**启动流式处理*/
env.execute(" liucf wordcoun")
}
}
3 验证
输入单词
控制台输出结果
以上是关于03-flink-1.10.1-流处理WordCount的主要内容,如果未能解决你的问题,请参考以下文章
04-flink-1.10.1-流处理WordCount代码里控制并行度