Flink 自定义 SQL Connector
Posted 非专业技术研究
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 自定义 SQL Connector相关的知识,希望对你有一定的参考价值。
Flink 自定义 SQL Connector
1. 启程
flink 本身提供了丰富的 sql connector, 一般不需要用户自己定义。但是在某些特殊的情况下需要手动实现针对实际场景的 sql connector。
最近在实践中遇到了两个比较极端的场景, 无法通过简单的 sql connector 实现:
- 业务1: 逻辑上将数据分写入到一个 flink sql table, 物理上分发到多个表。
- 业务2:数据采集系统中,数据上报到某一张表,但是每次上报的字段不同。
flink sql 是建立在 streaming api 之上的。实现 sql connector 本质上是在钩子中编写一些 streaming 逻辑。
有哪些钩子呢?可以先看下图
- DynamicTableSinkFactory
- DynamicTableSink
- RichSinkFunction
sql connector 定义在 WITH ( connector = \'xxx\')
中,这需要 Java SPI 机制在进行注册发现。
在 resources 目录下建立 META-INF/service/org.apache.flink.table.factories.Factory
实现 DynamicTableSinkFactory 后将 class 路径填写到上面的 Factory 文件。
2. 实现
这里以一个\'json-print\'sql connector 为例来说明 sql connector 的编写方式。
json-print` : 逻辑 sink 表中的数据以 json 结构打印输出。重点突出钩子逻辑,简化输出逻辑。生产中的输出可能是各种存储介质,这里简单使用标准输出代替。
2.1 DynamicTableFactory
- 首先定义一个
WithOption
类来定义 with 中的参数
public class WithOptions
public static final String IDENTIFIER = "json-print";
public static final ConfigOption<Integer> BATCH_SIZE = ConfigOptions.key("batch-size").intType().noDefaultValue();
- 定义
JsonPrintDynamicTableFactory
实现DynamicTableSinkFactory
- 定义可选和必选参数
- 定义 identifier (connector = \'xxx\' 名字)
- 传递 options, schema 给到
JsonPrintDynamicTableSink
类
options 获取传入 sql connector 的参数
schema 可以获取表结构信息,如字段名、字段类型、主键等
public class JsonPrintDynamicTableFactory implements DynamicTableSinkFactory
@Override
public DynamicTableSink createDynamicTableSink(Context context)
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
// options and schema
return new JsonPrintDynamicTableSink(options, schema);
// connector name
@Override
public String factoryIdentifier()
return WithOptions.IDENTIFIER;
// connector required args
@Override
public Set<ConfigOption<?>> requiredOptions()
HashSet<ConfigOption<?>> options = new HashSet<>();
options.add(WithOptions.BATCH_SIZE);
return options;
// connector optional args
@Override
public Set<ConfigOption<?>> optionalOptions()
return new HashSet<ConfigOption<?>>();
2.2 DynamicTableSink
- 设置支持的 change log mode
- 提供 RuntimeProvider (这里创建 SinkFunction)
public class JsonPrintDynamicTableSink implements DynamicTableSink, Serializable
ReadableConfig options;
ResolvedSchema schema;
public JsonPrintDynamicTableSink(ReadableConfig options, ResolvedSchema schema)
this.options = options;
this.schema = schema;
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode)
// support change log mode
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context)
// args
var batchSize = options.get(WithOptions.BATCH_SIZE);
JsonPrintConfig cfg = new JsonPrintConfig(batchSize);
// schema is very important
// we can get columns info and primary keys info from schema
List<Column> columns = schema.getColumns();
ArrayList<Tuple2<String, String>> cols = new ArrayList<>();
for (int i = 0; i < columns.size(); i++)
var col = columns.get(i);
cols.add(new Tuple2<>(col.getName(), col.getDataType().getConversionClass().getSimpleName()));
return SinkFunctionProvider.of(new JsonPrintSinkFunction(cfg, cols));
@Override
public DynamicTableSink copy()
return new JsonPrintDynamicTableSink(options, schema);
@Override
public String asSummaryString()
return "JsonPrint Table Sink";
2.3 RichSinkFunction
简单实现一个 SinkFunction。应注意的是传出的泛型类应该是 flink 的 RowData
@Slf4j
public class JsonPrintSinkFunction extends RichSinkFunction<RowData> implements CheckpointedFunction
private final JsonPrintConfig config;
private final ArrayList<Tuple2<String, String>> columns;
private final List<RowData> buffer = new LinkedList<>();
public JsonPrintSinkFunction(JsonPrintConfig config, ArrayList<Tuple2<String, String>> columns)
this.config = config;
this.columns = columns;
@Override
public void open(Configuration parameters) throws Exception
super.open(parameters);
// 在这里建立数据库连接...
@Override
public void close() throws Exception
super.close();
@Override
public void invoke(RowData row, Context context) throws Exception
super.invoke(row, context);
if (row == null)
return;
buffer.add(row);
if (buffer.size() >= config.getBatchSize())
doSink();
private void doSink()
if (buffer.size() <= 0)
return;
for (var row : buffer)
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < columns.size(); i++)
var col = columns.get(i);
var feName = col.f0;
var feType = col.f1;
switch (feType)
case "String":
map.put(feName, row.getString(i).toString());
break;
case "Integer":
map.put(feName, row.getInt(i));
break;
System.out.printf("%s => %s\\n", row.getRowKind().toString() ,JSON.toJSONString(map));
buffer.clear();
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception
doSink();
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception
3. 实现源码
以上是关于Flink 自定义 SQL Connector的主要内容,如果未能解决你的问题,请参考以下文章
Flink SQL实战演练之自定义Clickhouse Connector
Flink Sql 自定义实现 kudu connector