Flink扩展 Table/SQL Scalar 函数的实现
Posted vinoYang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink扩展 Table/SQL Scalar 函数的实现相关的知识,希望对你有一定的参考价值。
overview
本文档以TO_BASE64
函数为例,介绍如何实现、扩展Flink Table&SQL Scalar function。
使用方式
自定义的Scalar函数有多种使用方式,我们以测试代码来了解一下,具体是如何使用的:
@Test
def testToBase64(): Unit =
testAllApis(
'f0.toBase64(),
"f0.toBase64()",
"TO_BASE64(f0)",
"VGhpcyBpcyBhIHRlc3QgU3RyaW5nLg==")
testAllApis(
'f8.toBase64(),
"f8.toBase64()",
"TO_BASE64(f8)",
"IFRoaXMgaXMgYSB0ZXN0IFN0cmluZy4g")
//null test
testAllApis(
'f33.toBase64(),
"f33.toBase64()",
"TO_BASE64(f33)",
"null")
testAllApis(
"".toBase64(),
"''.toBase64()",
"TO_BASE64('')",
"")
testAllApis(
'f33.toBase64(),
"f33.toBase64()",
"to_base64(f33)",
"null")
从测试代码可见,大致上存在三种使用方式:
- Table API调用函数
- Table 字符串表达式使用函数
- SQL 中以字符串的形式使用函数
在SQL使用时函数不区分大小写,在Table中使用,如果函数无参数,可以省略括号,这些能力由Flink Table&SQL 自身提供支持。
函数实现
一个函数的逻辑是它最核心的部分,这里我们的示例函数TO_BASE64
很简单:
org.apache.flink.table.runtime.functions.ScalarFunctions
/**
* Returns a string's representation that encoded as base64.
*/
def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes())
实现模板
提供了函数的实现后,我们需要通过反射的方式给函数一个代号以便我们可以在其他地方引用它:
org.apache.flink.table.codegen.calls.BuiltInMethods
val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", classOf[String])
然后为SQL定义函数:
org.apache.flink.table.functions.sql.ScalarSqlFunctions
val TO_BASE64 = new SqlFunction(
"TO_BASE64",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
InferTypes.RETURN_TYPE,
OperandTypes.family(SqlTypeFamily.STRING),
SqlFunctionCategory.STRING
)
然后我们需要为SQL中使用该函数再代码生成的时候如何找到它的实现提供一个绑定关系:
org.apache.flink.table.codegen.calls.FunctionGenerator
addSqlFunctionMethod(
TO_BASE64,
Seq(STRING_TYPE_INFO),
STRING_TYPE_INFO,
BuiltInMethods.TOBASE64)
当函数在字符串表达式中使用时,Flink首先要去正确地解析并验证它,作为一个节点,这里需要提供关于该函数的必要的元数据信息:
org.apache.flink.table.expressions.stringExpressions.scala
case class ToBase64(child: Expression) extends UnaryExpression with InputTypeSpec
override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
override private[flink] def validateInput(): ValidationResult =
if (child.resultType == STRING_TYPE_INFO)
ValidationSuccess
else
ValidationFailure(s"ToBase64 operator requires String input, " +
s"but $child is of type $child.resultType")
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
relBuilder.call(ScalarSqlFunctions.TO_BASE64, children.map(_.toRexNode))
override def toString: String = s"($child).toBase64"
接下来,我们为Table API定义函数API:
org.apache.flink.table.api.scala.ImplicitExpressionOperations
/**
* Returns a string's representation that encoded as base64.
*/
def toBase64() = ToBase64(expr)
最后,我们需要为该函数的校验提供映射信息:
org.apache.flink.table.validate.FunctionCatalog.scala
"toBase64" -> classOf[ToBase64],
ScalarSqlFunctions.TO_BASE64,
测试与文档
每个函数实现完成后都必须经过充分的测试,以验证逻辑的正确性。测试用例补充进 org.apache.flink.table.expressions.ScalarFunctionsTest
。
函数完成后,请补充相关文档:
Table API: flink/docs/dev/table/tableApi.md
SQL: flink/docs/dev/table/sql.md
示例与参考
LOG2(X): https://github.com/apache/flink/pull/6404/files
以上是关于Flink扩展 Table/SQL Scalar 函数的实现的主要内容,如果未能解决你的问题,请参考以下文章