FlinkFlink SQL 自定义 Source format

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink SQL 自定义 Source format相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink SQL 自定义 Source format

1.背景

由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format。

2.步骤

1.自定义 Factory 实现 DeserializationFormatFactory

2.自定义 DeserializationSchema 实现 DeserializationSchema

自定义 Factory 中 createDecodingFormat 方法返回 createDecodingFormat

3.自定义 Format

为了简单起见,我们自定义一个 NullFormat ,也就是无论 kafka 中的消息是什么都返回 null,相当于 kafka 中没有消息

自定义 Factory

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.formats.json.JsonOptions.*;

/**
 * Table format factory for providing configured instances of JSON to RowData
 * @link SerializationSchema and @link DeserializationSchema.
 */
public class NullFormatFactory implements
		DeserializationFormatFactory 
	// Factory 的唯一标识
	public static final String IDENTIFIER = "null";
	
	@SuppressWarnings("unchecked")
	@Override
	// 解码的入口方法 基本上属于固定写法
	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
			DynamicTableFactory.Context context,
			ReadableConfig formatOptions) 
		FactoryUtil.validateFactoryOptions(this, formatOptions);
		validateFormatOptions(formatOptions);
		
		final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
		final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
		TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
		
		return new DecodingFormat<DeserializationSchema<RowData>>() 
			@Override
			public DeserializationSchema<RowData> createRuntimeDecoder(
					DynamicTableSource.Context context,//ScanRuntimeProviderContext
					DataType producedDataType)  // 表的字段名和数据类型
				final RowType rowType = (RowType) producedDataType.getLogicalType();
				final TypeInformation<RowData> rowDataTypeInfo =
						(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
				return new NullRowDataDeserializationSchema(
						rowType,
						rowDataTypeInfo,
						failOnMissingField,
						ignoreParseErrors,
						timestampOption
				);
			
			
			@Override
			public ChangelogMode getChangelogMode() 
				return ChangelogMode.insertOnly();
			
		;
	
	
	
	@Override
	public String factoryIdentifier() 
		return IDENTIFIER;
	
	
	@Override
	public Set<ConfigOption<?>> requiredOptions() 
		return Collections.emptySet();
	
	
	@Override
	public Set<ConfigOption<?>> optionalOptions() 
		Set<ConfigOption<?>> options = new HashSet<>();
		options.add(FAIL_ON_MISSING_FIELD);
		options.add(IGNORE_PARSE_ERRORS);
		options.add(TIMESTAMP_FORMAT);
		return options;
	
	
	// ------------------------------------------------------------------------
	//  Validation
	// ------------------------------------------------------------------------
	
	static void validateFormatOptions(ReadableConfig tableOptions) 
		boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);
		boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);
		String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
		if (ignoreParseErrors && failOnMissingField) 
			throw new ValidationException(FAIL_ON_MISSING_FIELD.key()
					+ " and "
					+ IGNORE_PARSE_ERRORS.key()
					+ " shouldn't both be true.");
		
		if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)) 
			throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",
					timestampFormat, TIMESTAMP_FORMAT.key()));
		
	

自定义 DeserializationSchema

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.io.IOException;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

@Internal
public class NullRowDataDeserializationSchema implements DeserializationSchema<RowData> 
	private static final long serialVersionUID = 1L;
	
	/**
	 * Flag indicating whether to fail if a field is missing.
	 */
	private final boolean failOnMissingField;
	
	/**
	 * Flag indicating whether to ignore invalid fields/rows (default: throw an exception).
	 */
	private final boolean ignoreParseErrors;
	
	/**
	 * TypeInformation of the produced @link RowData.
	 **/
	private final TypeInformation<RowData> resultTypeInfo;
	
	/**
	 * Runtime converter that converts @link JsonNodes into
	 * objects of Flink SQL internal data structures.
	 **/
	
	/**
	 * Object mapper for parsing the JSON.
	 */
	private final ObjectMapper objectMapper = new ObjectMapper();
	
	/**
	 * Timestamp format specification which is used to parse timestamp.
	 */
	private final TimestampFormat timestampFormat;
	
	public NullRowDataDeserializationSchema(
			RowType rowType,
			TypeInformation<RowData> resultTypeInfo,
			boolean failOnMissingField,
			boolean ignoreParseErrors,
			TimestampFormat timestampFormat) 
		if (ignoreParseErrors && failOnMissingField) 
			throw new IllegalArgumentException(
					"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
		
		this.resultTypeInfo = checkNotNull(resultTypeInfo);
		this.failOnMissingField = failOnMissingField;
		this.ignoreParseErrors = ignoreParseErrors;
		this.timestampFormat = timestampFormat;
	
	
	@Override
	// 这里其实是真正的反序列化逻辑,比如说将 json 拍平 (多层嵌套转化为一层嵌套 )
	// 这里是重点,记得关注重点
	public RowData deserialize(byte[] message) throws IOException 
		return null;
	
	
	@Override
	public boolean isEndOfStream(RowData nextElement) 
		return false;
	
	
	@Override
	public TypeInformation<RowData> getProducedType() 
		return resultTypeInfo;
	
	
	@Override
	public boolean equals(Object o) 
		if (this == o) 
			return true;
		
		if (o == null || getClass() != o.getClass()) 
			return false;
		
		NullRowDataDeserializationSchema that = (NullRowDataDeserializationSchema) o;
		return failOnMissingField == that.failOnMissingField &&
				ignoreParseErrors == that.ignoreParseErrors &&
				resultTypeInfo.equals(that.resultTypeInfo) &&
				timestampFormat.equals(that.timestampFormat);
	
	
	@Override
	public int hashCode() 
		return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
	

4.使用自定义 Format

public class SqlKafka 
	public static void main(String[] args) throws Exception 
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
		// enable checkpointing
		Configuration configuration = tableEnv.getConfig().getConfiguration();
		configuration.set(
				ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
		configuration.set(
				ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
		
		String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
				") WITH (" +
				"'connector' = 'kafka','topic' = 'canal_monitor_order'," +
				"'properties.bootstrap.servers' = 'bigdata-dev-mq:9092','properties.group.id' = 'testGroup'," +
				"'format' = 'null','scan.startup.mode' = 'earliest-offset')";
		tableEnv.executeSql(sql);
		......

‘format’ = ‘null’ Factory 的唯一标识

然后就可以直接执行了

以上是关于FlinkFlink SQL 自定义 Source format的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFLink SQL 如何使用嵌套格式的Schema

FlinkFlink SQL 读取 CSV 文件

FlinkFlink 模拟 kafka 消费重启后 能接着读取 保存状态信息

FlinkFlink 流计算 容错 source节点进行数据容错

flinkflink Source Coordinator Thread already exists driving actions Source Coordinator. Existing

FLinkFlink SQL 解析嵌套的 JSON 数据