从0到1Flink的成长之路(十三)

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(十三)相关的知识,希望对你有一定的参考价值。

接上一篇文章继续

3. API和编程模型

和批处理类似,Flink的流处理也支持多个层次的API并包含三个方面:
Source/Transformation/Sink

step1、Obtain an execution environment
step2、Load/create the initial data
step3、Specify transformations on this data
step4、Specify where to put the results of your computations
step5、Trigger the program execution
在这里插入图片描述

3.1 词频统计(Java 语言)
使用Java语言编写从TCP Socket读取数据,进行词频统计WordCount,结果打印至控制台。

package xx.xxxx.flink.start;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 基于 Flink 流计算引擎:从TCP Socket消费数据,实时词频统计WordCount
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env:流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1) ;
// 2. 数据源-source:Socket接收数据
DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 转换处理-transformation:调用DataSet函数,处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
// a. 过滤数据
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0;
}
})
// b. 分割单词
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().toLowerCase().split("\\\\W+");
for (String word : words) {
out.collect(word);
}
}
})
// c. 转换二元组,表示每个单词出现一次
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
})
// d. 按照单词分组及对组内聚合操作
.keyBy(0).sum(1);
// d. 数据终端-sink:结果数据打印控制台
resultDataStream.print();
// e. 执行应用-execute
env.execute(StreamWordCount.class.getSimpleName()) ;
}
}

3.2 词频统计(Scala 语言)
使用Scala语言编写从TCP Socket读取数据,进行词频统计WordCount,结果打印至控制台。

package xx.xxxxx.flink.start
import org.apache.flink.streaming.api.scala._
/**
* 使用Scala语言编程实现Flink实时词频统计WordCount,从TCP Socket读取数据,分析结果打印控制台。
*/
object FlinkWordCount {
def main(args: Array[String]): Unit = {
// 1. 执行环境-env
// val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.setParallelism(2)
// 2. 数据源-source
val inputDataStream: DataStream[String] = env.socketTextStream("node1.itcast.cn", 9999)
// 3. 数据转换-transformation
val resultDataStream: DataStream[(String, Int)] = inputDataStream
// a. 过滤数据,如空字符串
.filter(line => null != line && line.trim.length > 0)
// b. 将每行数据分割为字符
.flatMap(line => line.trim.toLowerCase().split("\\\\W+"))
// c. 转换为二元组,表示每个单词出现一次
.map(word => (word, 1))
// d. 按照单词分组和组内聚合计数
.keyBy(0).sum(1)
// 4. 数据终端-sink
resultDataStream.printToErr()
// 5. 触发执行-execute
env.execute(FlinkWordCount.getClass.getSimpleName.stripSuffix("$"))
}
}

以上是关于从0到1Flink的成长之路(十三)的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(十四)

从0到1Flink的成长之路(十六)

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路