WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算
Posted 东海陈光剑
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算相关的知识,希望对你有一定的参考价值。
WideTableMultiDimSQLParser 解析说明
1.ClickHouse 数组交并差运算
--交 t[1] ∩ t[2] : arrayIntersect(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;
--并 t[1] ∪ t[2]: arrayConcat(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;
--差 t[1]-t[2] : arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayIntersect(t[3], arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;
--并
select length(arrayDistinct(t.res))
from (
select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;
ClickHouse :
(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )),
(select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 ))
2.Hive 数组交并差运算:
select
array_intersect(array(1, 2), array(2, 3)) i,
array_union(array(1, 2), array(2, 3)) u,
array_except(array(1, 2), array(2, 3)) e;
Hive:
(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )),
(select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 ))
附源码
data class TagIdx(var kexprId: Int, var tagCode: String, var tagOptionCode: String, var conditionExpr: String, var index: Int)
fun isLeafNode(e: KunLunExpression) = CollectionUtils.isEmpty(e.subExpression)
fun tagOptionConditions(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): List<TagIdx>
val tagIdxList = mutableListOf<TagIdx>()
//递归解析rule表达式,打平成过滤条件列表
val kexpr: KunLunExpression = requestDTO.expression
parseTagIdx(kexpr, tagIdxList, tableMappingMap)
// 设置 index 字段值,用索引下标+1
tagIdxList.forEachIndexed index, tagIdx ->
tagIdx.index = index + 1
return tagIdxList
fun parseTagIdx(kexpr: KunLunExpression, tagIdxList: MutableList<TagIdx>, tableMappingMap: Map<String, List<KTableMapping>>)
val fieldCondition = kexpr.fieldCondition
if (null != fieldCondition)
val dimFilter = StringBuilder()
// 维度过滤条件,每个标签 TableCode 上都有自己的维度.真正用于过滤的是 FieldCode,所以 fieldCondition 这里加上: tagDimCondition
val dimConditionList = kexpr.fieldCondition.dimConditionList
if (CollectionUtils.isEmpty(dimConditionList))
dimFilter.append(" 1=1 ")
else
val lastIndex = dimConditionList.size - 1
dimConditionList.forEachIndexed index, dimField ->
val dimTagCode = dimField.tableCode
val dimFieldCode = dimField.fieldCode
val dimKTableMapping = tableMappingMap[dimTagCode]!![0]
val dimPhysicalField = dimKTableMapping.fields.first it.srcField.columnCode == dimFieldCode .dstField
val dimPhysicalcolumnCode = dimPhysicalField.columnCode
val dimFieldValueType = dimPhysicalField.fieldType
val v = parseFieldValue(dimField, dimFieldValueType)
val singleValue = v.get(0)?.sqlCondition
if (index != lastIndex)
dimFilter.append(" $dimPhysicalcolumnCode = $singleValue and ")
else
dimFilter.append(" $dimPhysicalcolumnCode = $singleValue ")
val tagCode = fieldCondition.tableCode
val fieldCode = fieldCondition.fieldCode
val KTableMapping = tableMappingMap[tagCode]!![0]
val physicalField = KTableMapping.fields.first it.srcField.columnCode == fieldCode .dstField
val physicalcolumnCode = physicalField.columnCode
val fieldValueType = physicalField.fieldType
val targetFieldCode = KTableMapping.targetField.columnCode
val dbName = KTableMapping.physicDBName
val tableName = KTableMapping.getkTableCode()
val filterConditionClause = genFilterConditionClause(fieldCondition, physicalcolumnCode, fieldValueType)
val line = "select collect_set($targetFieldCode) from $dbName.$tableName where ( $dimFilter ) and ( $filterConditionClause )"
val tagIdx = TagIdx(kexprId = kexpr.tfId, tagCode = tagCode, tagOptionCode = fieldCode, conditionExpr = line, index = -1) // index 先设置默认值 -1
tagIdxList.add(tagIdx)
// 递归子语句
kexpr.subExpression?.forEach
parseTagIdx(it, tagIdxList, tableMappingMap)
fun genFilterConditionClause(fieldCondition: FieldCondition, physicalField: String, fieldValueType: KFieldValueType): String
val fv = parseFieldValue(fieldCondition, fieldValueType)
if (CollectionUtils.isEmpty(fv))
throw IllegalArgumentException("fieldCondition must have fieldValue!")
val size = fv.size
// 多值(1,2,3,4)
val listValue = StringBuilder()
listValue.append("(")
fv.forEachIndexed index, fieldValue ->
if (index == size - 1)
listValue.append(fieldValue?.sqlCondition)
else
listValue.append(fieldValue?.sqlCondition).append(",")
listValue.append(")")
// 单值
val singleValue = fv.get(0)?.sqlCondition
val singleValueNoQuote = fv.get(0)?.qlCondition
var conditionExpr = ""
conditionExpr = when (fieldCondition.operator)
ArithmeticOperatorEnum.LIKE -> " like '%$singleValueNoQuote%' "
ArithmeticOperatorEnum.EQUAL -> " = $singleValue "
ArithmeticOperatorEnum.GREATER_EQUAL_THAN -> " >= $singleValue "
ArithmeticOperatorEnum.LESS_THAN -> " < $singleValue "
ArithmeticOperatorEnum.LESS_EQUAL_THAN -> " <= $singleValue "
ArithmeticOperatorEnum.GREATER_THAN -> " > $singleValue "
ArithmeticOperatorEnum.BETWEEN -> " between $fv.get(0)?.sqlCondition and $fv.get(1)?.sqlCondition "
ArithmeticOperatorEnum.IN -> " in $listValue "
ArithmeticOperatorEnum.NOT_IN -> " not in $listValue "
else -> throw IllegalStateException("$fieldCondition.operator not supported yet")
return " $physicalField $conditionExpr "
/**
* 解析 fieldValue 值
*/
fun parseFieldValue(fieldCondition: FieldCondition, fieldValueType: KFieldValueType): List<FieldValue<*>?>
val values = fieldCondition.values
if (values == null || values.isEmpty())
ExceptionHelper.bizError("illegal value size,values length must greater than 0.")
// 特征值类型
lateinit var clazz: Class<out FieldValue<*>>
when (fieldValueType)
KFieldValueType.STRING -> clazz = StringFieldValue::class.java
KFieldValueType.LONG -> clazz = LongFieldValue::class.java
KFieldValueType.DOUBLE -> clazz = DoubleFieldValue::class.java
else -> ExceptionHelper.bizError("$fieldValueType fieldValueType not supported!")
return FieldValue.create(clazz, *values.toTypedArray())
/**
* 递归遍历KunLun表达式,并添加tagCode/ objectSet.
*/
fun recurExtractTagCodeAndObjectSet(expression: KunLunExpression, tagBaseFieldList: MutableList<TagBaseField>, objectSetList: MutableList<String>)
// 子表达式为空,递归结束
if (isLeafNode(expression))
val fieldCondition = expression.fieldCondition
// 添加分群
if (StringUtils.isNotEmpty(fieldCondition.objectSetId))
objectSetList.add(fieldCondition.objectSetId)
else
// 添加标签
val tagBaseField = TagBaseField()
tagBaseField.tableCode = fieldCondition.tableCode
tagBaseField.fieldCode = fieldCondition.fieldCode
tagBaseFieldList.add(tagBaseField)
return
// 递归遍历子节点
for (subExpression in expression.subExpression)
recurExtractTagCodeAndObjectSet(subExpression, tagBaseFieldList, objectSetList)
@Service
class CommonParseUtils
fun getTableMappingMap(tenant: Tenant, requestDTO: SQLQueryReqDTO): Map<String, List<KTableMapping>>
// 标签 & 分群
val tagBaseFieldList: MutableList<TagBaseField> = mutableListOf()
val objectSetList: MutableList<String> = mutableListOf()
recurExtractTagCodeAndObjectSet(requestDTO.getExpression(), tagBaseFieldList, objectSetList)
// META
val tableMappingList: List<KTableMapping> = getTagCodeTableMapping(tenant.id, tagBaseFieldList, requestDTO.getDriverType())
return tableMappingList.groupBy it.tableCode
/**
* 获取KunLun表达式中所有标签对应物理表的映射关系.
*/
fun getTagCodeTableMapping(tenantId: Long, tagBaseFieldList: List<TagBaseField>, driverType: DriverType): List<KTableMapping>
if (CollectionUtils.isEmpty(tagBaseFieldList))
return emptyList()
// 获取映射关系
// TODO 元数据: kTableMappings
val kTableMappings: List<KTableMapping> = ArrayList()
val tagCodeTableMapping = kTableMappings.stream().collect(Collectors.toMap( obj: KTableMapping -> obj.tableCode , Function.identity()))
// check
for (tagBaseField in tagBaseFieldList)
val kTableMapping = tagCodeTableMapping[tagBaseField.tableCode] ?: throw ExceptionHelper.bizError(String.format("tag code [%s] is non-exists", tagBaseField.tableCode))
val fields = kTableMapping.fields
val existsTagOption = fields.stream().noneMatch kFieldMapping: KFieldMapping -> kFieldMapping.srcField.columnCode == tagBaseField.fieldCode
if (!existsTagOption)
throw ExceptionHelper.bizError(String.format("tag option [%s] is non-exists", tagBaseField.fieldCode))
return kTableMappings
/**
* 宽表多维标签CH SQL 解析器
* @author chenguangjian.jk
* @date 2022-03-09 02:28:48
*/
@Service
class WideTableMultiDimCHSQLParser
val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
@Resource
lateinit var commonParseUtils: CommonParseUtils
/**
* 宽表多维标签预估 SQL
*/
fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
// Parse KunLunExpression
return WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap)
)
/**
* 宽表多维标签圈选 SQL
*/
fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
val csvFile = ""
// Parse KunLunExpression
return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap),
csvFile = csvFile,
)
fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String
val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
val exprMap = tagIdxs.groupBy it.kexprId
return genWhereClause(exprMap, requestDTO.expression)
private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String
val subExpression = kunLunExpression.subExpression
if (CollectionUtils.isEmpty(subExpression)) // 叶子节点
return ""
val w = StringBuffer()
val size = subExpression.size
val logic = kunLunExpression.logic
w.append("(")
if (logic == LogicOperatorEnum.AND)
w.append("arrayIntersect(")
else if (logic == LogicOperatorEnum.OR)
w.append("arrayConcat(")
else if (logic == LogicOperatorEnum.EXCEPT)
w.append("arrayMap(x->multiIf(x not in arrayIntersect(")
else
throw IllegalArgumentException("logic $logic not supported!")
var firstTagIdx: Int = 1
subExpression.forEachIndexed index, e ->
// 最叶子节点
if (isLeafNode(e))
val targetTagIdx = exprMap[e.tfId]?.get(0)
val tagIdx = targetTagIdx!!.index
// 计算差集使用
if (index == 0)
firstTagIdx = tagIdx
if (index != size - 1)
w.append("t[$tagIdx],")
else
w.append("t[$tagIdx]")
// 递归非叶子节点
else
w.append(genWhereClause(exprMap, e))
if (logic == LogicOperatorEnum.AND || logic == LogicOperatorEnum.OR)
w.append("))")
else if (logic == LogicOperatorEnum.EXCEPT)
w.append("), x, NULL), t[$firstTagIdx]))")
return w.toString()
/**
* 生成 arrayLines (最后一行没有: , 逗号)
(select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
(select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
*/
fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String
val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
val size = tagIdxs.size
val arrayLines = StringBuffer()
tagIdxs.forEachIndexed index, tagIdx ->
if (index != size - 1)
arrayLines.append("($tagIdx.conditionExpr), \\n")
else
arrayLines.append("($tagIdx.conditionExpr) \\n")
return arrayLines.toString()
/**
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t
*/
private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr: String,
arrayLines: String,
) = """
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""
/**
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t
INTO OUTFILE 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
*/
private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr: String,
arrayLines: String,
csvFile: String,
) = """
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
select $expr as res,
array(
$arrayLines
) t
) t
INTO OUTFILE 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
"""
/**
tagIdxList=["conditionExpr":"select groupUniqArray(user_id) from db1.table1 where ( cate_id = '1001' ) and ( f1 = '1' )","index":1,"kexprId":684563482,"tagCode":"t1","tagOptionCode":"f1","conditionExpr":"select groupUniqArray(user_id) from db2.table2 where ( cate_id = '1002' ) and ( f2 = '22' )","index":2,"kexprId":684642314,"tagCode":"t2","tagOptionCode":"f2","conditionExpr":"select groupUniqArray(user_id) from db2.table2 where ( shop_id = '798322' ) and ( f3 = 333 )","index":3,"kexprId":568144263,"tagCode":"t2","tagOptionCode":"f3","conditionExpr":"select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1004' ) and ( f4 = '4' )","index":4,"kexprId":684626037,"tagCode":"t3","tagOptionCode":"f4","conditionExpr":"select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1005' ) and ( f5 = 5 )","index":5,"kexprId":684627036,"tagCode":"t3","tagOptionCode":"f5","conditionExpr":"select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1006' ) and ( f6 = 6 )","index":6,"kexprId":684628027,"tagCode":"t3","tagOptionCode":"f6"]
(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select groupUniqArray(user_id) from db1.table1 where ( cate_id = '1001' ) and ( f1 = '1' )),
(select groupUniqArray(user_id) from db2.table2 where ( cate_id = '1002' ) and ( f2 = '22' )),
(select groupUniqArray(user_id) from db2.table2 where ( shop_id = '798322' ) and ( f3 = 333 )),
(select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1004' ) and ( f4 = '4' )),
(select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1005' ) and ( f5 = 5 )),
(select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1006' ) and ( f6 = 6 ))
*/
fun main()
val requestDTO = SQLQueryReqDTO()
val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
val expression = KunLunExpression()
expression.logic = LogicOperatorEnum.EXCEPT
val subExpressionList = arrayListOf<KunLunExpression>()
val e1 = KunLunExpression()
val e2 = KunLunExpression()
val e3 = KunLunExpression()
val dimList = listOf(
FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
)
e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
e3.logic = LogicOperatorEnum.AND
val e3SubExpressionList = arrayListOf<KunLunExpression>()
val e31 = KunLunExpression()
val e32 = KunLunExpression()
val e33 = KunLunExpression()
e3SubExpressionList.add(e31)
e3SubExpressionList.add(e32)
e3SubExpressionList.add(e33)
e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
e3.subExpression = e3SubExpressionList
subExpressionList.add(e1)
subExpressionList.add(e2)
subExpressionList.add(e3)
expression.subExpression = subExpressionList
requestDTO.expression = expression
// KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
// KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
// KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
// KFieldMapping(KField srcField, KField dstField)
tableMappingMap["t1"] = listOf(KTableMapping(
"t1",
"table1",
"db1",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db1", "table1", "user_id"),
listOf(
KFieldMapping(
KField("f1", "", KFieldValueType.STRING, ""), // srcField
KField("f1", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))
tableMappingMap["t2"] = listOf(KTableMapping(
"t2",
"table2",
"db2",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db2", "table2", "user_id"),
listOf(
KFieldMapping(
KField("f2", "", KFieldValueType.STRING, ""), // srcField
KField("f2", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f3", "", KFieldValueType.LONG, ""), // srcField
KField("f3", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))
tableMappingMap["t3"] = listOf(KTableMapping(
"t3",
"table3",
"db3",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db3", "table3", "user_id"),
listOf(
KFieldMapping(
KField("f4", "", KFieldValueType.STRING, ""), // srcField
KField("f4", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f5", "", KFieldValueType.LONG, ""), // srcField
KField("f5", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("f6", "", KFieldValueType.LONG, ""), // srcField
KField("f6", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))
val WideTableMultiDimCHSQLParser = WideTableMultiDimCHSQLParser()
val expr = WideTableMultiDimCHSQLParser.expr(requestDTO, tableMappingMap)
val arrayLines = WideTableMultiDimCHSQLParser.arrayLines(requestDTO, tableMappingMap)
println(expr)
println(arrayLines)
/**
* 宽表多维标签 HIVE SQL 解析器
* @author chenguangjian.jk
* @date 2022-03-09 02:28:48
*/
@Service
class WideTableMultiDimHiveSQLParser
val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
@Resource
lateinit var commonParseUtils: CommonParseUtils
/**
* 宽表多维标签预估 SQL
*/
fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
// Parse KunLunExpression
return WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap)
)
/**
* 宽表多维标签圈选 SQL
*/
fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
val csvFile = ""
// Parse KunLunExpression
return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap),
csvFile = csvFile,
)
fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String
val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
val exprMap = tagIdxs.groupBy it.kexprId
return genWhereClause(exprMap, requestDTO.expression)
private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String
val subExpression = kunLunExpression.subExpression
if (CollectionUtils.isEmpty(subExpression)) // 叶子节点
return ""
val w = StringBuffer()
val size = subExpression.size
val logic = kunLunExpression.logic
w.append("(")
if (logic == LogicOperatorEnum.AND)
w.append("array_intersect(")
else if (logic == LogicOperatorEnum.OR)
w.append("array_union(")
else if (logic == LogicOperatorEnum.EXCEPT)
w.append("array_except(")
else
throw IllegalArgumentException("logic $logic not supported!")
var firstTagIdx: Int = 1
subExpression.forEachIndexed index, e ->
// 最叶子节点
if (isLeafNode(e))
val targetTagIdx = exprMap[e.tfId]?.get(0)
val tagIdx = targetTagIdx!!.index
// 计算差集使用
if (index == 0)
firstTagIdx = tagIdx
if (index != size - 1)
w.append("t[$tagIdx],")
else
w.append("t[$tagIdx]")
// 递归非叶子节点
else
w.append(genWhereClause(exprMap, e))
w.append("))")
return w.toString()
/**
* 生成 arrayLines (最后一行没有: , 逗号)
(select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
(select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
*/
fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String
val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
val size = tagIdxs.size
val arrayLines = StringBuffer()
tagIdxs.forEachIndexed index, tagIdx ->
if (index != size - 1)
arrayLines.append("($tagIdx.conditionExpr), \\n")
else
arrayLines.append("($tagIdx.conditionExpr) \\n")
return arrayLines.toString()
/**
select size(t.res) as cnt
from (
select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
array(
(select collect_set(UserID) from hits_v1 where Sex = 1),
(select collect_set(UserID) from hits_v1 where Age > 18),
(select collect_set(UserID) from hits_v1 where RequestNum > 0)
) t
) t
*/
private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr: String,
arrayLines: String,
) = """
select size(t.res) as cnt
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""
/**
select explode(t.res) as ids
from (
select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
array(
(select collect_set(UserID) from hits_v1 where Sex = 1),
(select collect_set(UserID) from hits_v1 where Age > 18),
(select collect_set(UserID) from hits_v1 where RequestNum > 0)
) t
) t
*/
private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr: String,
arrayLines: String,
csvFile: String,
) = """
select explode(t.res) as ids
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""
/**
WideTableMultiDimCHSQLParser - tagIdxList=["conditionExpr":"select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )","index":1,"kexprId":-316732738,"tagCode":"t1","tagOptionCode":"f1","conditionExpr":"select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )","index":2,"kexprId":-316653905,"tagCode":"t2","tagOptionCode":"f2","conditionExpr":"select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )","index":3,"kexprId":-315132611,"tagCode":"t2","tagOptionCode":"f3","conditionExpr":"select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )","index":4,"kexprId":127438862,"tagCode":"t3","tagOptionCode":"f4","conditionExpr":"select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )","index":5,"kexprId":127439854,"tagCode":"t3","tagOptionCode":"f5","conditionExpr":"select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 )","index":6,"kexprId":-316668196,"tagCode":"t3","tagOptionCode":"f6"]
(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )),
(select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 ))
*/
fun main()
val requestDTO = SQLQueryReqDTO()
val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
val expression = KunLunExpression()
expression.logic = LogicOperatorEnum.EXCEPT
val subExpressionList = arrayListOf<KunLunExpression>()
val e1 = KunLunExpression()
val e2 = KunLunExpression()
val e3 = KunLunExpression()
val dimList = listOf(
FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
)
e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
e3.logic = LogicOperatorEnum.AND
val e3SubExpressionList = arrayListOf<KunLunExpression>()
val e31 = KunLunExpression()
val e32 = KunLunExpression()
val e33 = KunLunExpression()
e3SubExpressionList.add(e31)
e3SubExpressionList.add(e32)
e3SubExpressionList.add(e33)
e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
e3.subExpression = e3SubExpressionList
subExpressionList.add(e1)
subExpressionList.add(e2)
subExpressionList.add(e3)
expression.subExpression = subExpressionList
requestDTO.expression = expression
// KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
// KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
// KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
// KFieldMapping(KField srcField, KField dstField)
tableMappingMap["t1"] = listOf(KTableMapping(
"t1",
"table1",
"db1",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db1", "table1", "user_id"),
listOf(
KFieldMapping(
KField("f1", "", KFieldValueType.STRING, ""), // srcField
KField("f1", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))
tableMappingMap["t2"] = listOf(KTableMapping(
"t2",
"table2",
"db2",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db2", "table2", "user_id"),
listOf(
KFieldMapping(
KField("f2", "", KFieldValueType.STRING, ""), // srcField
KField("f2", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f3", "", KFieldValueType.LONG, ""), // srcField
KField("f3", "", KFieldValueType.LONG, "") // dstField
),
)
))
tableMappingMap["t3"] = listOf(KTableMapping(
"t3",
"table3",
"db3",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db3", "table3", "user_id"),
listOf(
KFieldMapping(
KField("f4", "", KFieldValueType.STRING, ""), // srcField
KField("f4", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f5", "", KFieldValueType.LONG, ""), // srcField
KField("f5", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("f6", "", KFieldValueType.LONG, ""), // srcField
KField("f6", "", KFieldValueType.LONG, "") // dstField
),
)
))
val WideTableMultiDimHiveSQLParser = WideTableMultiDimHiveSQLParser()
val expr = WideTableMultiDimHiveSQLParser.expr(requestDTO, tableMappingMap)
val arrayLines = WideTableMultiDimHiveSQLParser.arrayLines(requestDTO, tableMappingMap)
println(expr)
println(arrayLines)
以上是关于WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算的主要内容,如果未能解决你的问题,请参考以下文章