如何实现FlinkIceberg和Hive数据类型之间的互转
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何实现FlinkIceberg和Hive数据类型之间的互转相关的知识,希望对你有一定的参考价值。
Flink、Iceberg和Hive都提供了丰富的SQL 数据类型,但是在具体使用上,还是存在一些差异和不兼容的情况,当将三套引擎放在一起使用时候,经常会碰到将几种引擎的类型之间来回转换。问题是,这几套独立的数据类型是否支持互相转换,以及如何才能减轻转换的复杂性?
为此,首先需要仔细地调研几种引擎对数据类型的支持情况,现将三种引擎支持的数据类型和对应关系罗列如下:
Flink | Iceberg | Hive |
CHAR | CHAR | |
VARCHAR | VARCHAR | |
STRING | string | STRING |
BOOLEAN | boolean | BOOLEAN |
BINARY | fixed(L) | |
VARBINARY | binary | BINARY |
BYTES | ||
DECIMAL | decimal(P,S) | DECIMAL |
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INTEGER | int | INT |
BIGINT | long | BIGINT |
FLOAT | float | FLOAT |
DOUBLE | double | DOUBLE |
DATE | date | DATE |
TIME | time | |
TIMESTAMP | timestamp | TIMESTAMP |
TIMESTAMP_LTZ | timestamptz | |
INTERVAL | INTERVAL | |
ARRAY | list<E> | array |
MULTISET | ||
MAP | map<K, V> | map |
ROW | struct<...> | struct |
RAW | ||
union | ||
Structured types | struct<...> |
数据类型在这三种引擎之间都可以互转,但是最终存储和持久化到Hive元数据中,首先来看Hive的数据类型。
Hive将数据类型分两大类:
Complex类型:map,array ,struct,union
Primitive类型: 除Complex类型之外,如 int,float, double 等
在Hive serde库中,定义了几种类型分类:
//ObjectInspector.class
public static enum Category
PRIMITIVE, LIST, MAP, STRUCT, UNION
;
其中Complex数据类型的使用语法格式分别为:
ARRAY<data_type>
MAP<primitive_type, data_type>
STRUCT<col_name : data_type [COMMENT col_comment], ...>
UNIONTYPE<data_type, data_type, ...>
举个建表示例:
CREATE TABLE union_test(foo UNIONTYPE<int, double, array<string>, struct<a:int,b:string>>);
每种数据类型在网络传输中都涉及到类型的序列化和反序列化,因此数据类型都有显示名称,也就是字符串表示的名字,如:
INT的显示类型名称为 int
CHAR的显示类型名称为 char
VARCHAR的显示类型名称为 varchar(length)
ARRAY的显示类型名称为 array<data_type>
MAP的显示类型名称为 map<primitive_type, data_type>
STRUCT的显示类型名称为 struct<col_name:data_type,...>
UNION的显示类型名称为uniontype<data_type, data_type, ...>
Hive serde库提供了对这些类型的序列化和反序列化方法:
//TypeInfoUtils.class
public static ArrayList<TypeInfo> getTypeInfosFromTypeString(String typeString) ;
public static List<String> getTypeStringsFromTypeInfo(List<TypeInfo> typeInfos);
举个例子,将map类型的字符串表示解析成TypeInfo类型对象表示:
ArrayList<TypeInfo> typeInfo = TypeInfoUtils.getTypeInfosFromTypeString("map<boolean,boolean>");
其中TypeInfo是Hive定义的类型信息的对象接口表示,Hive总共实现了以下这些TypeInfo子类:
CharTypeInfo |
VarcharTypeInfo |
DecimalTypeInfo |
ListTypeInfo |
MapTypeInfo |
StructTypeInfo |
PrimitiveTypeInfo |
UnionTypeInfo |
而其中的DecimalTypeInfo属于基本数据类型的合集:
public static enum PrimitiveCategory
VOID,
BOOLEAN,
BYTE,
SHORT,
INT,
LONG,
FLOAT,
DOUBLE,
STRING,
DATE,
TIMESTAMP,
TIMESTAMPLOCALTZ,
BINARY,
DECIMAL,
VARCHAR,
CHAR,
INTERVAL_YEAR_MONTH,
INTERVAL_DAY_TIME,
UNKNOWN;
在PrimitiveCategory定义里, DECIMAL, VARCHAR,CHAR 被归属于基本类型,但是又实现了相应的TypeInfo子类。可见,在数据类型的归属上,Hive存在不一致的地方。
看完了Hive的数据类型,再来看Flink的数据类型定义:
CharType |
VarCharType |
BinaryType |
VarBinaryType |
DecimalType |
ArrayType |
MapType |
StructuredType |
TinyIntType |
SmallIntType |
IntType |
BigIntType |
BooleanType |
DoubleType |
FloatType |
TimestampType |
TimeType |
DateType |
DayTimeIntervalType |
YearMonthIntervalType |
ZonedTimestampType |
LocalZonedTimestampType |
NullType |
RawType |
RowType |
MultisetType |
SymbolType |
UserDefinedType |
UnresolvedUserDefinedType |
相比Hive的数据类型,Flink定义的数据类型要丰富得多,并且将基本数据类型也分开定义了。Flink开源社区在Flink Hive Connector包中提供了Flink和Hive数据类型的互相转换:
//HiveTypeUtil.class
//Hive数据类型转换为Flink数据类型
public static DataType toFlinkType(TypeInfo hiveType)
Preconditions.checkNotNull(hiveType, "hiveType cannot be null");
switch(hiveType.getCategory())
case PRIMITIVE:
return toFlinkPrimitiveType((PrimitiveTypeInfo)hiveType);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo)hiveType;
return DataTypes.ARRAY(toFlinkType(listTypeInfo.getListElementTypeInfo()));
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo)hiveType;
return DataTypes.MAP(toFlinkType(mapTypeInfo.getMapKeyTypeInfo()), toFlinkType(mapTypeInfo.getMapValueTypeInfo()));
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo)hiveType;
List<String> names = structTypeInfo.getAllStructFieldNames();
List<TypeInfo> typeInfos = structTypeInfo.getAllStructFieldTypeInfos();
Field[] fields = new Field[names.size()];
for(int i = 0; i < fields.length; ++i)
fields[i] = DataTypes.FIELD((String)names.get(i), toFlinkType((TypeInfo)typeInfos.get(i)));
return DataTypes.ROW(fields);
default:
throw new UnsupportedOperationException(String.format("Flink doesn't support Hive data type %s yet.", hiveType));
private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType)
Preconditions.checkNotNull(hiveType, "hiveType cannot be null");
switch(hiveType.getPrimitiveCategory())
case CHAR:
return DataTypes.CHAR(((CharTypeInfo)hiveType).getLength());
case VARCHAR:
return DataTypes.VARCHAR(((VarcharTypeInfo)hiveType).getLength());
case STRING:
return DataTypes.STRING();
case BOOLEAN:
return DataTypes.BOOLEAN();
case BYTE:
return DataTypes.TINYINT();
case SHORT:
return DataTypes.SMALLINT();
case INT:
return DataTypes.INT();
case LONG:
return DataTypes.BIGINT();
case FLOAT:
return DataTypes.FLOAT();
case DOUBLE:
return DataTypes.DOUBLE();
case DATE:
return DataTypes.DATE();
case TIMESTAMP:
return DataTypes.TIMESTAMP(9);
case BINARY:
return DataTypes.BYTES();
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)hiveType;
return DataTypes.DECIMAL(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
default:
throw new UnsupportedOperationException(String.format("Flink doesn't support Hive primitive type %s yet", hiveType));
//Flink数据类型转为Hive数据类型
public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision)
Preconditions.checkNotNull(dataType, "type cannot be null");
LogicalType logicalType = dataType.getLogicalType();
return (TypeInfo)logicalType.accept(new HiveTypeUtil.TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
再来看Iceberg的数据类型定义:
StringType |
FixedType |
BinaryType |
DecimalType |
ListType |
MapType |
StructType |
IntegerType |
LongType |
BooleanType |
DoubleType |
FloatType |
TimestampType |
TimeType |
DateType |
UUIDType |
Iceberg的数据类型定义也是将复合数据类型和基本数据类型独立定义,但是相比Flink,类型相对较少,Iceberg类型作为中间态的存在,Iceberg同样提供了数据类型跟Flink、Hive的互相转换:
//HiveSchemaUtil.class
//Iceberg类型转换为Hive类型
public static TypeInfo convert(Type type)
return TypeInfoUtils.getTypeInfoFromTypeString(convertToTypeString(type));
//Hive类型转换为Iceberg类型
public static Type convert(TypeInfo typeInfo)
return HiveSchemaConverter.convert(typeInfo, false);
Iceberg类型转换为Hive类型的思路是先将Iceberg对象表示的类型转换为字符串表示的中间结果,然后再用Hive的TypeInfo解析器解析为Hive支持的数据类型。
而将Hive类型转换为Iceberg类型,仍然采用传统的逐一匹配方式:
//HiveSchemaConverter.class
Type convertType(TypeInfo typeInfo)
switch(typeInfo.getCategory())
case PRIMITIVE:
switch(((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory())
case FLOAT:
return FloatType.get();
case DOUBLE:
return DoubleType.get();
case BOOLEAN:
return BooleanType.get();
case BYTE:
case SHORT:
Preconditions.checkArgument(this.autoConvert, "Unsupported Hive type: %s, use integer instead", ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory());
LOG.debug("Using auto conversion from SHORT/BYTE to INTEGER");
return IntegerType.get();
case INT:
return IntegerType.get();
case LONG:
return LongType.get();
case BINARY:
return BinaryType.get();
case CHAR:
case VARCHAR:
Preconditions.checkArgument(this.autoConvert, "Unsupported Hive type: %s, use string instead", ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory());
LOG.debug("Using auto conversion from CHAR/VARCHAR to STRING");
return StringType.get();
case STRING:
return StringType.get();
case TIMESTAMP:
return TimestampType.withoutZone();
case DATE:
return DateType.get();
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)typeInfo;
return DecimalType.of(decimalTypeInfo.precision(), decimalTypeInfo.scale());
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
default:
if ("TIMESTAMPLOCALTZ".equalsIgnoreCase(((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory().name()))
return TimestampType.withZone();
throw new IllegalArgumentException("Unsupported Hive type (" + ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory() + ") for Iceberg tables.");
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo;
List<NestedField> fields = this.convertInternal(structTypeInfo.getAllStructFieldNames(), structTypeInfo.getAllStructFieldTypeInfos(), Collections.emptyList());
return StructType.of(fields);
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo;
Type keyType = this.convertType(mapTypeInfo.getMapKeyTypeInfo());
Type valueType = this.convertType(mapTypeInfo.getMapValueTypeInfo());
int keyId = this.id++;
int valueId = this.id++;
return MapType.ofOptional(keyId, valueId, keyType, valueType);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo;
Type listType = this.convertType(listTypeInfo.getListElementTypeInfo());
return ListType.ofOptional(this.id++, listType);
case UNION:
default:
throw new IllegalArgumentException("Unknown type " + typeInfo.getCategory());
Flink类型转换为Iceberg采用了访问器设计模式,从实现中很明了地可知类型转换支持的类型:
//FlinkTypeToType.class
public Type visit(CharType charType)
return StringType.get();
public Type visit(VarCharType varCharType)
return StringType.get();
public Type visit(BooleanType booleanType)
return org.apache.iceberg.types.Types.BooleanType.get();
public Type visit(BinaryType binaryType)
return FixedType.ofLength(binaryType.getLength());
public Type visit(VarBinaryType varBinaryType)
return org.apache.iceberg.types.Types.BinaryType.get();
public Type visit(DecimalType decimalType)
return org.apache.iceberg.types.Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
public Type visit(TinyIntType tinyIntType)
return IntegerType.get();
public Type visit(SmallIntType smallIntType)
return IntegerType.get();
public Type visit(IntType intType)
return IntegerType.get();
public Type visit(BigIntType bigIntType)
return LongType.get();
public Type visit(FloatType floatType)
return org.apache.iceberg.types.Types.FloatType.get();
public Type visit(DoubleType doubleType)
return org.apache.iceberg.types.Types.DoubleType.get();
public Type visit(DateType dateType)
return org.apache.iceberg.types.Types.DateType.get();
public Type visit(TimeType timeType)
return org.apache.iceberg.types.Types.TimeType.get();
public Type visit(TimestampType timestampType)
return org.apache.iceberg.types.Types.TimestampType.withoutZone();
public Type visit(LocalZonedTimestampType localZonedTimestampType)
return org.apache.iceberg.types.Types.TimestampType.withZone();
public Type visit(ArrayType arrayType)
Type elementType = (Type)arrayType.getElementType().accept(this);
return arrayType.getElementType().isNullable() ? ListType.ofOptional(this.getNextId(), elementType) : ListType.ofRequired(this.getNextId(), elementType);
public Type visit(MultisetType multisetType)
Type elementType = (Type)multisetType.getElementType().accept(this);
return MapType.ofRequired(this.getNextId(), this.getNextId(), elementType, IntegerType.get());
public Type visit(org.apache.flink.table.types.logical.MapType mapType)
Type keyType = (Type)mapType.getKeyType().accept(this);
Type valueType = (Type)mapType.getValueType().accept(this);
return mapType.getValueType().isNullable() ? MapType.ofOptional(this.getNextId(), this.getNextId(), keyType, valueType) : MapType.ofRequired(this.getNextId(), this.getNextId(), keyType, valueType);
public Type visit(RowType rowType)
List<NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
boolean isRoot = this.root == rowType;
List<Type> types = (List)rowType.getFields().stream().map((f) ->
return (Type)f.getType().accept(this);
).collect(Collectors.toList());
for(int i = 0; i < rowType.getFieldCount(); ++i)
int id = isRoot ? i : this.getNextId();
RowField field = (RowField)rowType.getFields().get(i);
String name = field.getName();
String comment = (String)field.getDescription().orElse((Object)null);
if (field.getType().isNullable())
newFields.add(NestedField.optional(id, name, (Type)types.get(i), comment));
else
newFields.add(NestedField.required(id, name, (Type)types.get(i), comment));
return StructType.of(newFields);
同样地,在TypeToFlinkType中也实现了Iceberg类型转为Flink类型:
//TypeToFlinkType.class
public LogicalType struct(StructType struct, List<LogicalType> fieldResults)
List<NestedField> fields = struct.fields();
List<RowField> flinkFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
for(int i = 0; i < fields.size(); ++i)
NestedField field = (NestedField)fields.get(i);
LogicalType type = (LogicalType)fieldResults.get(i);
RowField flinkField = new RowField(field.name(), type.copy(field.isOptional()), field.doc());
flinkFields.add(flinkField);
return new RowType(flinkFields);
public LogicalType field(NestedField field, LogicalType fieldResult)
return fieldResult;
public LogicalType list(ListType list, LogicalType elementResult)
return new ArrayType(elementResult.copy(list.isElementOptional()));
public LogicalType map(MapType map, LogicalType keyResult, LogicalType valueResult)
return new org.apache.flink.table.types.logical.MapType(keyResult.copy(false), valueResult.copy(map.isValueOptional()));
public LogicalType primitive(PrimitiveType primitive)
switch(primitive.typeId())
case BOOLEAN:
return new BooleanType();
case INTEGER:
return new IntType();
case LONG:
return new BigIntType();
case FLOAT:
return new FloatType();
case DOUBLE:
return new DoubleType();
case DATE:
return new DateType();
case TIME:
return new TimeType();
case TIMESTAMP:
TimestampType timestamp = (TimestampType)primitive;
if (timestamp.shouldAdjustToUTC())
return new LocalZonedTimestampType(6);
return new org.apache.flink.table.types.logical.TimestampType(6);
case STRING:
return new VarCharType(2147483647);
case UUID:
return new BinaryType(16);
case FIXED:
FixedType fixedType = (FixedType)primitive;
return new BinaryType(fixedType.length());
case BINARY:
return new VarBinaryType(2147483647);
case DECIMAL:
DecimalType decimal = (DecimalType)primitive;
return new org.apache.flink.table.types.logical.DecimalType(decimal.precision(), decimal.scale());
default:
throw new UnsupportedOperationException("Cannot convert unknown type to Flink: " + primitive);
上面详细介绍了三种引擎的数据类型定义和相互转换方法,为了直观地看到三种类型的转换结果,现在将它们整理在一个表格中:
Flink | Iceberg | Hive |
CharType | StringType | CharTypeInfo(STRING、CHAR) |
VarCharType | StringType | VarcharTypeInfo(STRING、VARCHAR) |
BinaryType | FixedType | BINARY |
VarBinaryType | BinaryType | BINARY |
DecimalType | DecimalType | DecimalTypeInfo(DECIMAL) |
ArrayType | ListType | ListTypeInfo |
MapType | MapType | MapTypeInfo |
StructuredType | StructType | StructTypeInfo |
TinyIntType | IntegerType | PrimitiveTypeInfo(TINYINT) |
SmallIntType | IntegerType | PrimitiveTypeInfo(SHORT) |
IntType | IntegerType | PrimitiveTypeInfo(INT) |
BigIntType | LongType | PrimitiveTypeInfo(LONG) |
BooleanType | BooleanType | PrimitiveTypeInfo(BOOLEAN) |
DoubleType | DoubleType | PrimitiveTypeInfo(DOUBLE) |
FloatType | FloatType | PrimitiveTypeInfo(FLOAT) |
TimestampType | TimestampType | PrimitiveTypeInfo(TIMESTAMP) |
TimeType | TimeType | PrimitiveTypeInfo(STRING) |
DateType | DateType | PrimitiveTypeInfo(DATE) |
DayTimeIntervalType | IPrimitiveTypeInfo(NTERVAL_DAY_TIME) | |
YearMonthIntervalType | PrimitiveTypeInfo(INTERVAL_YEAR_MONTH) | |
ZonedTimestampType | ||
LocalZonedTimestampType | TimestampType | |
NullType | PrimitiveTypeInfo(VOID) | |
RawType | ||
RowType | StructType | StructTypeInfo |
MultisetType | MapType | |
SymbolType | ||
UserDefinedType | ||
UnresolvedUserDefinedType | ||
UnionTypeInfo | ||
UUIDType | STRING |
从表格中,很明显地得知,Flink提供了最复杂的类型信息,包括复合自定义数据类型。而三者公共支持的类型是基本类型和struct,map,array,而基本类型中的带有时区的时间戳格式,三者目前还存在不兼容的情况,空格空白表示不支持的数据类型转换。另外,在梳理三种引擎类型的转换过程中,我们看到它们都以Hive的数据类型为基准,即都可以从Hive数据类型为中介来实现互转,在其中存在较多的冗余实现代码,正因为如此,这给设计通用数据类型体系提供了新的途径。
以上是关于如何实现FlinkIceberg和Hive数据类型之间的互转的主要内容,如果未能解决你的问题,请参考以下文章