Flink Sql 自定义实现 kudu connector
Posted Direction_Wind
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Sql 自定义实现 kudu connector相关的知识,希望对你有一定的参考价值。
Flink Sql 自定义实现 kudu connector
众所周知啊,flinksql 中与其他的存储做数据的传输连接的时候,是需要有独特的连接器的,mysql redis es hbase kudu ,不同的存储他们自己使用的协议与操作都不一样,所以需要相应的连接器来连接,这个帖子主要讲一下怎么实现自定义的flink sql connector ,不只局限于kudu ,其他的连接器都是这个原理
原理
其实原理跟从网上下载的 mysql连接器一样,打包编译,添加好pom文件,sql解析时,会根据程序中配置的connector 来做判断是那种解析器,然后与pom中引入的解析器做匹配。
那么具体要如何开发引入一个connector呢?
简单来说需要三个东西
sink类实例 : KuduDynamicTableSink
工厂类: KuduDynamicTableFactory
以及一个配置文件:org.apache.flink.table.factories.Factory
其实主要利用了java的SPI原理,用户需要在工程的resources中新建一个文件
这里放什么呢,放的是 用户开发的 工厂类的地址
com.datacenter.connectors.kudu.table.KuduDynamicTableFactory
原因就是,SPI会从这个文件中找到工厂类,然后由工厂类来构造出sink实例供sql解析出的对象使用
实现
KuduDynamicTableSink:
package com.datacenter.streaming.sql.connectors.kudu.table;
import com.datacenter.streaming.sql.connectors.kudu.KuduOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduOutputFormat;
import com.datacenter.streaming.sql.connectors.kudu.KuduSinkOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.types.RowKind;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkState;
/**
* KuduDynamicTableSink
* @author loveyou
*/
public class KuduDynamicTableSink implements DynamicTableSink
private final KuduOptions kuduOptions;
private final KuduSinkOptions kuduSinkOptions;
private TableSchema physicalSchema;
private int bufferFlushInterval;
private int maxRetries;
private List<String> keyFields;
public KuduDynamicTableSink(KuduOptions kuduOptions, KuduSinkOptions kuduSinkOptions, TableSchema physicalSchema)
this.kuduOptions = kuduOptions;
this.kuduSinkOptions = kuduSinkOptions;
this.physicalSchema = physicalSchema;
UniqueConstraint uniqueConstraint = physicalSchema.getPrimaryKey().orElse(null);
if (uniqueConstraint != null)
this.keyFields = uniqueConstraint.getColumns();
this.bufferFlushInterval = (int) kuduSinkOptions.getBatchIntervalMs();
this.maxRetries = kuduSinkOptions.getMaxRetries();
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode)
validatePrimaryKey(requestedMode);
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context)
KuduOutputFormat kuduOutputFormat = new KuduOutputFormat(
kuduOptions.getMaster(),
kuduOptions.getTable(),
physicalSchema.getFieldNames(),
physicalSchema.getFieldDataTypes(),
bufferFlushInterval,
maxRetries
);
return OutputFormatProvider.of(kuduOutputFormat);
@Override
public DynamicTableSink copy()
return new KuduDynamicTableSink(
kuduOptions,
kuduSinkOptions,
physicalSchema);
@Override
public String asSummaryString()
return null;
private void validatePrimaryKey(ChangelogMode requestedMode)
checkState(ChangelogMode.insertOnly().equals(requestedMode) || keyFields == null,
"please declare primary key for sink table when query contains update/delete record.");
package com.datacenter.streaming.sql.connectors.kudu.table;
import com.datacenter.streaming.sql.connectors.kudu.KuduLookupOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduSinkOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* KuduDynamicTableFactory
* @author loveyou
*/
public class KuduDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory
// common options
public static final String IDENTIFIER = "kudu";
public static final ConfigOption<String> MASTER = ConfigOptions
.key("master")
.stringType()
.noDefaultValue()
.withDescription("the kudu master address.");
public static final ConfigOption<String> TABLE = ConfigOptions
.key("table")
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions
.key("username")
.stringType()
.noDefaultValue()
.withDescription("the jdbc user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions
.key("password")
.stringType()
.noDefaultValue()
.withDescription("the jdbc password.");
// lookup options
private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions
.key("lookup.cache.max-rows")
.longType()
.defaultValue(-1L)
.withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " +
"be eliminated. \\"cache.max-rows\\" and \\"cache.ttl\\" options must all be specified if any of them is " +
"specified. Cache is not enabled as default.");
private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions
.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(-1))
.withDescription("the cache time to live.");
private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions
.key("lookup.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if lookup database failed.");
// write options
//private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
// .key("sink.buffer-flush.max-rows")
// .intType()
// .defaultValue(100)
// .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
// " of records, will flush data. The default value is 100.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
/**
* DynamicTableSource 实例
*
* @param context
* @return
*/
@Override
public DynamicTableSource createDynamicTableSource(Context context)
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
helper.validate();
validateConfigOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new KuduDynamicTableSource(
getKuduOptions(helper.getOptions()),
getKuduLookupOptions(helper.getOptions()),
physicalSchema);
/**
* DynamicTableSink 实例
*
* @param context
* @return
*/
@Override
public DynamicTableSink createDynamicTableSink(Context context)
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
helper.validate();
validateConfigOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new KuduDynamicTableSink(
getKuduOptions(config),
getKuduSinkOptions(config),
physicalSchema);
private KuduOptions getKuduOptions(ReadableConfig readableConfig)
KuduOptions.KuduOptionsBuilder builder = KuduOptions.builder()
.master(readableConfig.get(MASTER))
.table(readableConfig.get(TABLE));
readableConfig.getOptional(USERNAME).ifPresent(builder::username);
readableConfig.getOptional(PASSWORD).ifPresent(builder::password);
return builder.build();
private KuduLookupOptions getKuduLookupOptions(ReadableConfig readableConfig)
KuduLookupOptions.KuduLookupOptionsBuilder builder = KuduLookupOptions.builder();
builder.cacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS));
builder.cacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis());
builder.maxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES));
return builder.build();
private KuduSinkOptions getKuduSinkOptions(ReadableConfig config)
KuduSinkOptions.KuduSinkOptionsBuilder builder = KuduSinkOptions.builder();
//builder.batchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.batchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.maxRetries(config.get(SINK_MAX_RETRIES));
return builder.build();
/**
* 工厂唯一标识符
*
* @return
*/
@Override
public String factoryIdentifier()
return IDENTIFIER;
/**
* 必选项
*
* @return
*/
@Override
public Set<ConfigOption<?>> requiredOptions()
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(MASTER);
requiredOptions.add(TABLE);
return requiredOptions;
/**
* 可选项
*
* @return
*/
@Override
public Set<ConfigOption<?>> optionalOptions()
Set<ConfigOption<?>> optionalOptions = new HashSet<>();
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
optionalOptions.add(LOOKUP_CACHE_TTL);
optionalOptions.add(LOOKUP_MAX_RETRIES);
//optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
optionalOptions.add(SINK_MAX_RETRIES);
return optionalOptions;
/**
* 验证配置
*
* @param config
*/
private void validateConfigOptions(ReadableConfig config)
checkAllOrNone(config, new ConfigOption[]
USERNAME,
PASSWORD
);
checkAllOrNone(config, new ConfigOption[]
LOOKUP_CACHE_MAX_ROWS,
LOOKUP_CACHE_TTL
);
Preconditions.checkArgument(
config.get(SINK_BUFFER_FLUSH_INTERVAL).compareTo(Duration.ofSeconds(1)) >= 0,
SINK_BUFFER_FLUSH_INTERVAL.key() + " must >= 1000"
);
/**
* 要么一个都没有,要么都要有
*
* @param config
* @param configOptions
*/
private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions)
int presentCount = 0;
for (ConfigOption configOption : configOptions)
if (config.getOptional(configOption).isPresent())
p以上是关于Flink Sql 自定义实现 kudu connector的主要内容,如果未能解决你的问题,请参考以下文章
Flink SQL实战演练之自定义Clickhouse Connector