从0到1Flink的成长之路(二十一)-Flink Table API 与 SQL

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(二十一)-Flink Table API 与 SQL相关的知识,希望对你有一定的参考价值。

Flink Table API 与 SQL

ProcessFunction 函数
如下图,在常规的业务开发中,SQL、Table API、DataStream API比较常用,处于Low-level的Porcession相对用得较少,通过实战来熟悉处理函数(Process Function)。

在这里插入图片描述
处理函数有很多种,最基础的应该ProcessFunction类,从它的类图可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement和onTimer。实际项目中往往先对数据尽心keyBy分组,再进行聚合处理,查看KeyedProcessFunction类图:

在这里插入图片描述
案例演示:

package xx.xxxxxx.flink.process;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* 基于Flink提供ProcessFunction底层API实现:词频统计WordCount
* https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
*/
public class StreamProcessDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource<String> lineStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 数据转换
SingleOutputStreamOperator<Tuple2<String, Integer>> wordStream = lineStream
.filter(line -> null != line && line.trim().length() > 0)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
for(String word: line.trim().toLowerCase().split("\\\\s+")){
out.collect(Tuple2.of(word, 1));
}
}
});
// TODO:调用process方法,对keyBy分组手KeyedStream进行聚合计算
SingleOutputStreamOperator<String> countStream = wordStream
.keyBy(0)
.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, String>() {
// 存储状态
private transient ValueState<Integer> countState ;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化状态
countState = getRuntimeContext().getState(
new ValueStateDescriptor<Integer>("countState", Integer.class)
);
}
@Override
public void processElement(Tuple2<String, Integer> value,
Context ctx,
Collector<String> out) throws Exception {
// 获取当前State值
Integer currentValue = value.f1 ;
// 获取历史State值
Integer historyValue = countState.value();
// 判断是否第一次,如果是直接更新
if(null == historyValue){
countState.update(currentValue);
}else{
countState.update(currentValue + historyValue);
}
// 返回值
out.collect(value.f0 + " = " + countState.value());
}
});
// 4. 数据终端-sink
countStream.printToErr();
// 5. 触发执行-execute
env.execute(StreamProcessDemo.class.getSimpleName());
}
}

其中需要自己管理状态,使用ValueState存储,运行结果截图如下:
在这里插入图片描述
Table API & SQL 概述
在这里插入图片描述
table
tableApi
sql

为什么需要Table API & SQL?
当在使用Flink做流式和批式任务计算的时候,往往会想到几个问题:

  1. 需要熟悉两套API : DataStream/DataSet API,API有一定难度,开发人员无法集中精力到
    具体业务的开发
  2. 需要有Java或Scala的开发经验
  3. Flink 同时支持批任务与流任务,如何做到API层的统一

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

Flink 已经拥有了强大的DataStream/DataSet API,满足流计算和批计算中的各种场景需求,但是关于以上几个问题,还需要提供一种关系型的API来实现Flink API层的流与批的统一,那么这就是Flink Table API & SQL。

Table API & SQL的特点
Flink之所以选择将 Table API & SQL 作为未来的核心 API,是因为其具有一些非常重要的特点:
在这里插入图片描述

  1. 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行;
  2. 高性能:可优化,内置多种查询优化器,这些查询优化器可为SQL翻译出最优执行计划;
  3. 简单易学:易于理解,不同行业和领域的人都懂,学习成本较低;
  4. 标准稳定:语义遵循SQL标准,非常稳定,在数据库30多年的历史中SQL本身变化较少;
  5. 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批
    模式运行,Flink底层Runtime本身就是一个流与批统一的引擎;

Table API& SQL 是一种关系型API,用户可以像操作mysql数据库表一样的操作数据,而不需要写Java代码完成Flink function,更不需要手工的优化Java代码调优。另外,SQL作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供SQL支持,将很容易被用户接受。

当然除了SQL的特性,因为Table API是在Flink中专门设计的,Table API还具有自身的特点:

  1. 表达方式的扩展性:可以为Table API开发很多便捷性功能,如:Row.flatten(), map等
  2. 功能的扩展性:可以为Table API扩展更多的功能,如:Iteration,flatAggregate 等新功能
  3. 编译检查:Table API支持Java和Scala语言开发,支持IDE中进行编译检查。

以上是关于从0到1Flink的成长之路(二十一)-Flink Table API 与 SQL的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(二十一)-异步IO

从0到1Flink的成长之路(二十一)-Flink+Kafka:End-to-End Exactly-Once代码示例

从0到1Flink的成长之路(二十一)-Flink+Kafka:End-to-End Exactly-Once

从0到1Flink的成长之路(二十一)-Flink+Kafka实现End-to-End Exactly-Once代码示例

从0到1Flink的成长之路(二十)-Flink 高级特性之Checkpoint 配置方式

从0到1Flink的成长之路(二十)-案例:时间会话窗口