flink table factory基础知识

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink table factory基础知识相关的知识,希望对你有一定的参考价值。

参考技术A 在flink中很多组件都是TableFactory的子类。比如序列化,反序列化,tableSinkFactory,tableSourceFactory. TableFactory是用来创建序列化,反序列器,tableSource和 tableSink的工厂。

在flink框架中,TableFactory 的子类并不是程序员自己随心 new 出来的。flink的提供给程序员初始TableFactory的途径是通过属性去初始化。可以理解为首先flink会加载出所有的TableFactory子类实现,用户需要传递属性给flink,flink自动去查找出具有你对应属性的TableFactory,然后进行初始化,返回给程序员。所以在TableFactory 的源码中只有这2个方法,如下。

public interface TableFactory

    Map requiredContext();  //定义这个TableFactory  必须要有哪些属性。

    List supportedProperties(); //定义这个TableFactory 支持哪些属性



有了这2个方法。用户想要找到某个特定的或者自定义的TableFactory 实现,那就在传给flink的属性中必须要含有,必须属性,必须属性一个都不能少,不然找不到。除此之外你还可能会传很多其他属性。这些非必须属性,也不能瞎传,你不需要把supportedProperties的属性都传入,但是你传入的非必须属性必须是supportedProperties内。凡是报错说找不到TableFactory 的一定是某个环节被不满足,或者压根就没加载。如何加载请往下看。

trait BatchTableSourceFactory[T]    extends TableFactory  //用于创建批的TableSource

trait BatchTableSinkFactory[T]        extends TableFactory  //用于创建批的TableSink

trait StreamTableSourceFactory[T] extends TableFactory  //用于创建流的TableSource

trait StreamTableSinkFactory[T]     extends TableFactory  //用于创建流的TableSink

TableSource、TableSink 是什么请查看 

flink tableSource tableSink 基础知识  https://www.jianshu.com/p/036ca8d40cb9

public interface TableFormatFactory  extends TableFactory //用于创建序列化和反序列化器

public interface SerializationSchemaFactory      extends TableFormatFactory //用于创建序列化器

public interface DeserializationSchemaFactory   extends TableFormatFactory  //用于创建反序列化器

什么时候需要序列化,什么时候需要反序列化呢?从kafka中收到一条消息需要转换成table中的一行这需要将字节数组反序列成一个row。反过来,table中的一行需要sink到kafka需要将row序列化成一个字节数组。

这里以kafka为例看下StreamTableSourceFactory 和 StreamTableSinkFactory

public abstract class KafkaTableSourceSinkFactoryBase     implements  StreamTableSourceFactory , StreamTableSinkFactory

    public MaprequiredContext() //Override TableFactory 的方法

    public ListsupportedProperties()  //Override TableFactory 的方法

    //Override StreamTableSinkFactory中方法 创建TableSink

    public StreamTableSink createStreamTableSink(Map properties) 

    //Override StreamTableSourceFactory中方法 创建TableSource

    public StreamTableSource createStreamTableSource(Map properties)



在kafka中KafkaTableSourceSinkFactoryBase 把  StreamTableSourceFactory,StreamTableSinkFactory 一起实现了。我个人觉得还是分开了好。

我第一次跟踪源码是怎么创建TableFactory的时候非常困惑,在他选择TableFactory的时候感觉有一些莫名其妙。特别是自定的一个TableFactory压根就找不到。

1、首先对于自定义的TableFactory一定要在META-INF 下建 services 目录,在services 下创建的包路径org.apache.flink.table.factories.TableFactory的文件,文件写入自定义TableFactory的实现类的吧。比如在Kafka010TableSourceSinkFactory 对应的jar flink-connector-kafka-0.10_2.11-1.7.0.jar中可以看到如下图

2、寻找table创建ableSoure注册table

val tableSource:TableSoure = TableFactoryUtil.findAndCreateTableSource(tableEnv, this)

tableEnv.registerTableSource(name, tableSource)

3、寻找table创建tableSoure

4、查找tableFactory

def find[T](factoryClass:Class[T], propertyMap: JMap[String, String]):T =

findInternal(factoryClass, propertyMap, None)

从0到1Flink的成长之路(二十一)-Flink Table API 与 SQL

Flink Table API 与 SQL

ProcessFunction 函数
如下图,在常规的业务开发中,SQL、Table API、DataStream API比较常用,处于Low-level的Porcession相对用得较少,通过实战来熟悉处理函数(Process Function)。

在这里插入图片描述
处理函数有很多种,最基础的应该ProcessFunction类,从它的类图可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement和onTimer。实际项目中往往先对数据尽心keyBy分组,再进行聚合处理,查看KeyedProcessFunction类图:

在这里插入图片描述
案例演示:

package xx.xxxxxx.flink.process;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* 基于Flink提供ProcessFunction底层API实现:词频统计WordCount
* https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
*/
public class StreamProcessDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource<String> lineStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 数据转换
SingleOutputStreamOperator<Tuple2<String, Integer>> wordStream = lineStream
.filter(line -> null != line && line.trim().length() > 0)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
for(String word: line.trim().toLowerCase().split("\\\\s+")){
out.collect(Tuple2.of(word, 1));
}
}
});
// TODO:调用process方法,对keyBy分组手KeyedStream进行聚合计算
SingleOutputStreamOperator<String> countStream = wordStream
.keyBy(0)
.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, String>() {
// 存储状态
private transient ValueState<Integer> countState ;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化状态
countState = getRuntimeContext().getState(
new ValueStateDescriptor<Integer>("countState", Integer.class)
);
}
@Override
public void processElement(Tuple2<String, Integer> value,
Context ctx,
Collector<String> out) throws Exception {
// 获取当前State值
Integer currentValue = value.f1 ;
// 获取历史State值
Integer historyValue = countState.value();
// 判断是否第一次,如果是直接更新
if(null == historyValue){
countState.update(currentValue);
}else{
countState.update(currentValue + historyValue);
}
// 返回值
out.collect(value.f0 + " = " + countState.value());
}
});
// 4. 数据终端-sink
countStream.printToErr();
// 5. 触发执行-execute
env.execute(StreamProcessDemo.class.getSimpleName());
}
}

其中需要自己管理状态,使用ValueState存储,运行结果截图如下:
在这里插入图片描述
Table API & SQL 概述
在这里插入图片描述
table
tableApi
sql

为什么需要Table API & SQL?
当在使用Flink做流式和批式任务计算的时候,往往会想到几个问题:

  1. 需要熟悉两套API : DataStream/DataSet API,API有一定难度,开发人员无法集中精力到
    具体业务的开发
  2. 需要有Java或Scala的开发经验
  3. Flink 同时支持批任务与流任务,如何做到API层的统一

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

Flink 已经拥有了强大的DataStream/DataSet API,满足流计算和批计算中的各种场景需求,但是关于以上几个问题,还需要提供一种关系型的API来实现Flink API层的流与批的统一,那么这就是Flink Table API & SQL。

Table API & SQL的特点
Flink之所以选择将 Table API & SQL 作为未来的核心 API,是因为其具有一些非常重要的特点:
在这里插入图片描述

  1. 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行;
  2. 高性能:可优化,内置多种查询优化器,这些查询优化器可为SQL翻译出最优执行计划;
  3. 简单易学:易于理解,不同行业和领域的人都懂,学习成本较低;
  4. 标准稳定:语义遵循SQL标准,非常稳定,在数据库30多年的历史中SQL本身变化较少;
  5. 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批
    模式运行,Flink底层Runtime本身就是一个流与批统一的引擎;

Table API& SQL 是一种关系型API,用户可以像操作MySQL数据库表一样的操作数据,而不需要写Java代码完成Flink function,更不需要手工的优化Java代码调优。另外,SQL作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供SQL支持,将很容易被用户接受。

当然除了SQL的特性,因为Table API是在Flink中专门设计的,Table API还具有自身的特点:

  1. 表达方式的扩展性:可以为Table API开发很多便捷性功能,如:Row.flatten(), map等
  2. 功能的扩展性:可以为Table API扩展更多的功能,如:Iteration,flatAggregate 等新功能
  3. 编译检查:Table API支持Java和Scala语言开发,支持IDE中进行编译检查。

以上是关于flink table factory基础知识的主要内容,如果未能解决你的问题,请参考以下文章

Flink 报错 "Could not find a suitable table factory for 'org.apache.flink.table.factories.Str

Flink基础(十三):Table API 和 Flink SQLAPI 调用

Flink Could not find any factory for identifier ‘json‘ that implements DeserializationFormatFactory

Flink Could not find any factory for identifier ‘json‘ that implements DeserializationFormatFactory

Flink学习之Table API(python版本)

Flink学习之Table API(python版本)