FlinkFlink SQL 自定义 Source format
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink SQL 自定义 Source format相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink SQL 自定义 Source format
1.背景
由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format。
2.步骤
1.自定义 Factory 实现 DeserializationFormatFactory
2.自定义 DeserializationSchema 实现 DeserializationSchema
自定义 Factory 中 createDecodingFormat 方法返回 createDecodingFormat
3.自定义 Format
为了简单起见,我们自定义一个 NullFormat ,也就是无论 kafka 中的消息是什么都返回 null,相当于 kafka 中没有消息
自定义 Factory
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.apache.flink.formats.json.JsonOptions.*;
/**
* Table format factory for providing configured instances of JSON to RowData
* @link SerializationSchema and @link DeserializationSchema.
*/
public class NullFormatFactory implements
DeserializationFormatFactory
// Factory 的唯一标识
public static final String IDENTIFIER = "null";
@SuppressWarnings("unchecked")
@Override
// 解码的入口方法 基本上属于固定写法
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions)
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateFormatOptions(formatOptions);
final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>()
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context,//ScanRuntimeProviderContext
DataType producedDataType) // 表的字段名和数据类型
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
return new NullRowDataDeserializationSchema(
rowType,
rowDataTypeInfo,
failOnMissingField,
ignoreParseErrors,
timestampOption
);
@Override
public ChangelogMode getChangelogMode()
return ChangelogMode.insertOnly();
;
@Override
public String factoryIdentifier()
return IDENTIFIER;
@Override
public Set<ConfigOption<?>> requiredOptions()
return Collections.emptySet();
@Override
public Set<ConfigOption<?>> optionalOptions()
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FAIL_ON_MISSING_FIELD);
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
return options;
// ------------------------------------------------------------------------
// Validation
// ------------------------------------------------------------------------
static void validateFormatOptions(ReadableConfig tableOptions)
boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);
boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);
String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
if (ignoreParseErrors && failOnMissingField)
throw new ValidationException(FAIL_ON_MISSING_FIELD.key()
+ " and "
+ IGNORE_PARSE_ERRORS.key()
+ " shouldn't both be true.");
if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat))
throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",
timestampFormat, TIMESTAMP_FORMAT.key()));
自定义 DeserializationSchema
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class NullRowDataDeserializationSchema implements DeserializationSchema<RowData>
private static final long serialVersionUID = 1L;
/**
* Flag indicating whether to fail if a field is missing.
*/
private final boolean failOnMissingField;
/**
* Flag indicating whether to ignore invalid fields/rows (default: throw an exception).
*/
private final boolean ignoreParseErrors;
/**
* TypeInformation of the produced @link RowData.
**/
private final TypeInformation<RowData> resultTypeInfo;
/**
* Runtime converter that converts @link JsonNodes into
* objects of Flink SQL internal data structures.
**/
/**
* Object mapper for parsing the JSON.
*/
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Timestamp format specification which is used to parse timestamp.
*/
private final TimestampFormat timestampFormat;
public NullRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean failOnMissingField,
boolean ignoreParseErrors,
TimestampFormat timestampFormat)
if (ignoreParseErrors && failOnMissingField)
throw new IllegalArgumentException(
"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
this.resultTypeInfo = checkNotNull(resultTypeInfo);
this.failOnMissingField = failOnMissingField;
this.ignoreParseErrors = ignoreParseErrors;
this.timestampFormat = timestampFormat;
@Override
// 这里其实是真正的反序列化逻辑,比如说将 json 拍平 (多层嵌套转化为一层嵌套 )
// 这里是重点,记得关注重点
public RowData deserialize(byte[] message) throws IOException
return null;
@Override
public boolean isEndOfStream(RowData nextElement)
return false;
@Override
public TypeInformation<RowData> getProducedType()
return resultTypeInfo;
@Override
public boolean equals(Object o)
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
NullRowDataDeserializationSchema that = (NullRowDataDeserializationSchema) o;
return failOnMissingField == that.failOnMissingField &&
ignoreParseErrors == that.ignoreParseErrors &&
resultTypeInfo.equals(that.resultTypeInfo) &&
timestampFormat.equals(that.timestampFormat);
@Override
public int hashCode()
return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
4.使用自定义 Format
public class SqlKafka
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
// enable checkpointing
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
configuration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
") WITH (" +
"'connector' = 'kafka','topic' = 'canal_monitor_order'," +
"'properties.bootstrap.servers' = 'bigdata-dev-mq:9092','properties.group.id' = 'testGroup'," +
"'format' = 'null','scan.startup.mode' = 'earliest-offset')";
tableEnv.executeSql(sql);
......
‘format’ = ‘null’ Factory 的唯一标识
然后就可以直接执行了
以上是关于FlinkFlink SQL 自定义 Source format的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFLink SQL 如何使用嵌套格式的Schema
FlinkFlink 模拟 kafka 消费重启后 能接着读取 保存状态信息
FlinkFlink 流计算 容错 source节点进行数据容错
flinkflink Source Coordinator Thread already exists driving actions Source Coordinator. Existing