WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算

Posted 东海陈光剑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算相关的知识,希望对你有一定的参考价值。

WideTableMultiDimSQLParser 解析说明

1.ClickHouse 数组交并差运算

--交 t[1] ∩ t[2] : arrayIntersect(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                               t
         ) t;

--并 t[1] ∪ t[2]: arrayConcat(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                         t
         ) t;

--差 t[1]-t[2] : arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayIntersect(t[3], arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                                                                             t
         ) t;

--并
select length(arrayDistinct(t.res))
from (
         select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                         t
         ) t;

ClickHouse :

(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))

2.Hive 数组交并差运算:

select
    array_intersect(array(1, 2), array(2, 3)) i,
    array_union(array(1, 2), array(2, 3)) u,
    array_except(array(1, 2), array(2, 3)) e;

Hive:

(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))

附源码

data class TagIdx(var kexprId: Int, var tagCode: String, var tagOptionCode: String, var conditionExpr: String, var index: Int)

fun isLeafNode(e: KunLunExpression) = CollectionUtils.isEmpty(e.subExpression)

fun tagOptionConditions(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): List<TagIdx> 
    val tagIdxList = mutableListOf<TagIdx>()
    //递归解析rule表达式,打平成过滤条件列表
    val kexpr: KunLunExpression = requestDTO.expression
    parseTagIdx(kexpr, tagIdxList, tableMappingMap)
    // 设置 index 字段值,用索引下标+1
    tagIdxList.forEachIndexed  index, tagIdx ->
        tagIdx.index = index + 1
    
    return tagIdxList



fun parseTagIdx(kexpr: KunLunExpression, tagIdxList: MutableList<TagIdx>, tableMappingMap: Map<String, List<KTableMapping>>) 
    val fieldCondition = kexpr.fieldCondition
    if (null != fieldCondition) 

        val dimFilter = StringBuilder()
        // 维度过滤条件,每个标签 TableCode 上都有自己的维度.真正用于过滤的是 FieldCode,所以 fieldCondition 这里加上: tagDimCondition
        val dimConditionList = kexpr.fieldCondition.dimConditionList

        if (CollectionUtils.isEmpty(dimConditionList)) 
            dimFilter.append(" 1=1 ")
         else 
            val lastIndex = dimConditionList.size - 1
            dimConditionList.forEachIndexed  index, dimField ->

                val dimTagCode = dimField.tableCode
                val dimFieldCode = dimField.fieldCode
                val dimKTableMapping = tableMappingMap[dimTagCode]!![0]
                val dimPhysicalField = dimKTableMapping.fields.first  it.srcField.columnCode == dimFieldCode .dstField
                val dimPhysicalcolumnCode = dimPhysicalField.columnCode
                val dimFieldValueType = dimPhysicalField.fieldType
                val v = parseFieldValue(dimField, dimFieldValueType)
                val singleValue = v.get(0)?.sqlCondition

                if (index != lastIndex) 
                    dimFilter.append(" $dimPhysicalcolumnCode = $singleValue and ")
                 else 
                    dimFilter.append(" $dimPhysicalcolumnCode = $singleValue ")
                
            
        

        val tagCode = fieldCondition.tableCode
        val fieldCode = fieldCondition.fieldCode
        val KTableMapping = tableMappingMap[tagCode]!![0]

        val physicalField = KTableMapping.fields.first  it.srcField.columnCode == fieldCode .dstField
        val physicalcolumnCode = physicalField.columnCode
        val fieldValueType = physicalField.fieldType
        val targetFieldCode = KTableMapping.targetField.columnCode
        val dbName = KTableMapping.physicDBName
        val tableName = KTableMapping.getkTableCode()
        val filterConditionClause = genFilterConditionClause(fieldCondition, physicalcolumnCode, fieldValueType)

        val line = "select collect_set($targetFieldCode) from $dbName.$tableName where ( $dimFilter ) and ( $filterConditionClause )"
        val tagIdx = TagIdx(kexprId = kexpr.tfId, tagCode = tagCode, tagOptionCode = fieldCode, conditionExpr = line, index = -1) // index 先设置默认值 -1
        tagIdxList.add(tagIdx)
    
    // 递归子语句
    kexpr.subExpression?.forEach 
        parseTagIdx(it, tagIdxList, tableMappingMap)
    


fun genFilterConditionClause(fieldCondition: FieldCondition, physicalField: String, fieldValueType: KFieldValueType): String 
    val fv = parseFieldValue(fieldCondition, fieldValueType)
    if (CollectionUtils.isEmpty(fv)) 
        throw IllegalArgumentException("fieldCondition must have fieldValue!")
    
    val size = fv.size
    // 多值(1,2,3,4)
    val listValue = StringBuilder()
    listValue.append("(")
    fv.forEachIndexed  index, fieldValue ->
        if (index == size - 1)
            listValue.append(fieldValue?.sqlCondition)
        else
            listValue.append(fieldValue?.sqlCondition).append(",")
    
    listValue.append(")")
    // 单值
    val singleValue = fv.get(0)?.sqlCondition
    val singleValueNoQuote = fv.get(0)?.qlCondition

    var conditionExpr = ""
    conditionExpr = when (fieldCondition.operator) 
        ArithmeticOperatorEnum.LIKE -> "  like '%$singleValueNoQuote%' "
        ArithmeticOperatorEnum.EQUAL -> "    = $singleValue "
        ArithmeticOperatorEnum.GREATER_EQUAL_THAN -> "    >= $singleValue "
        ArithmeticOperatorEnum.LESS_THAN -> "    < $singleValue "
        ArithmeticOperatorEnum.LESS_EQUAL_THAN -> "    <= $singleValue "
        ArithmeticOperatorEnum.GREATER_THAN -> "    > $singleValue "
        ArithmeticOperatorEnum.BETWEEN -> "    between $fv.get(0)?.sqlCondition and $fv.get(1)?.sqlCondition "
        ArithmeticOperatorEnum.IN -> "    in $listValue "
        ArithmeticOperatorEnum.NOT_IN -> "    not in $listValue "

        else -> throw IllegalStateException("$fieldCondition.operator not supported yet")
    

    return " $physicalField $conditionExpr "


/**
 * 解析 fieldValue 值
 */
fun parseFieldValue(fieldCondition: FieldCondition, fieldValueType: KFieldValueType): List<FieldValue<*>?> 
    val values = fieldCondition.values
    if (values == null || values.isEmpty()) 
        ExceptionHelper.bizError("illegal value size,values length must greater than 0.")
    

    // 特征值类型
    lateinit var clazz: Class<out FieldValue<*>>
    when (fieldValueType) 
        KFieldValueType.STRING -> clazz = StringFieldValue::class.java
        KFieldValueType.LONG -> clazz = LongFieldValue::class.java
        KFieldValueType.DOUBLE -> clazz = DoubleFieldValue::class.java
        else -> ExceptionHelper.bizError("$fieldValueType fieldValueType not supported!")
    
    return FieldValue.create(clazz, *values.toTypedArray())






/**
 * 递归遍历KunLun表达式,并添加tagCode/ objectSet.
 */
fun recurExtractTagCodeAndObjectSet(expression: KunLunExpression, tagBaseFieldList: MutableList<TagBaseField>, objectSetList: MutableList<String>) 

    // 子表达式为空,递归结束
    if (isLeafNode(expression)) 
        val fieldCondition = expression.fieldCondition

        // 添加分群
        if (StringUtils.isNotEmpty(fieldCondition.objectSetId)) 
            objectSetList.add(fieldCondition.objectSetId)
         else 
            // 添加标签
            val tagBaseField = TagBaseField()
            tagBaseField.tableCode = fieldCondition.tableCode
            tagBaseField.fieldCode = fieldCondition.fieldCode
            tagBaseFieldList.add(tagBaseField)
        
        return
    

    // 递归遍历子节点
    for (subExpression in expression.subExpression) 
        recurExtractTagCodeAndObjectSet(subExpression, tagBaseFieldList, objectSetList)
    


@Service
class CommonParseUtils 


    fun getTableMappingMap(tenant: Tenant, requestDTO: SQLQueryReqDTO): Map<String, List<KTableMapping>> 
        // 标签 & 分群
        val tagBaseFieldList: MutableList<TagBaseField> = mutableListOf()
        val objectSetList: MutableList<String> = mutableListOf()
        recurExtractTagCodeAndObjectSet(requestDTO.getExpression(), tagBaseFieldList, objectSetList)
        // META
        val tableMappingList: List<KTableMapping> = getTagCodeTableMapping(tenant.id, tagBaseFieldList, requestDTO.getDriverType())
        return tableMappingList.groupBy  it.tableCode 
    

    /**
     * 获取KunLun表达式中所有标签对应物理表的映射关系.
     */
    fun getTagCodeTableMapping(tenantId: Long, tagBaseFieldList: List<TagBaseField>, driverType: DriverType): List<KTableMapping> 
        if (CollectionUtils.isEmpty(tagBaseFieldList)) 
            return emptyList()
        

        // 获取映射关系
        // TODO 元数据: kTableMappings
        val kTableMappings: List<KTableMapping> = ArrayList()

        val tagCodeTableMapping = kTableMappings.stream().collect(Collectors.toMap( obj: KTableMapping -> obj.tableCode , Function.identity()))

        // check
        for (tagBaseField in tagBaseFieldList) 
            val kTableMapping = tagCodeTableMapping[tagBaseField.tableCode] ?: throw ExceptionHelper.bizError(String.format("tag code [%s] is non-exists", tagBaseField.tableCode))
            val fields = kTableMapping.fields
            val existsTagOption = fields.stream().noneMatch  kFieldMapping: KFieldMapping -> kFieldMapping.srcField.columnCode == tagBaseField.fieldCode 
            if (!existsTagOption) 
                throw ExceptionHelper.bizError(String.format("tag option [%s] is non-exists", tagBaseField.fieldCode))
            
        
        return kTableMappings
    

/**
 * 宽表多维标签CH SQL 解析器
 * @author chenguangjian.jk
 * @date 2022-03-09 02:28:48
 */
@Service
class WideTableMultiDimCHSQLParser 
    val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)

    @Resource
    lateinit var commonParseUtils: CommonParseUtils

    /**
     * 宽表多维标签预估 SQL
     */
    fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String 
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        // Parse KunLunExpression
        return WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap)
        )
    


    /**
     * 宽表多维标签圈选 SQL
     */
    fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String 
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        
        val csvFile = ""
        // Parse KunLunExpression
        return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap),
            csvFile = csvFile,
        )
    


    fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String 
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val exprMap = tagIdxs.groupBy  it.kexprId 
        return genWhereClause(exprMap, requestDTO.expression)
    


    private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String 
        val subExpression = kunLunExpression.subExpression
        if (CollectionUtils.isEmpty(subExpression))  // 叶子节点
            return ""
        

        val w = StringBuffer()
        val size = subExpression.size
        val logic = kunLunExpression.logic

        w.append("(")

        if (logic == LogicOperatorEnum.AND) 
            w.append("arrayIntersect(")
         else if (logic == LogicOperatorEnum.OR) 
            w.append("arrayConcat(")
         else if (logic == LogicOperatorEnum.EXCEPT) 
            w.append("arrayMap(x->multiIf(x not in arrayIntersect(")
         else 
            throw IllegalArgumentException("logic $logic not supported!")
        

        var firstTagIdx: Int = 1
        subExpression.forEachIndexed  index, e ->
            // 最叶子节点
            if (isLeafNode(e)) 
                val targetTagIdx = exprMap[e.tfId]?.get(0)
                val tagIdx = targetTagIdx!!.index

                // 计算差集使用
                if (index == 0) 
                    firstTagIdx = tagIdx
                

                if (index != size - 1) 
                    w.append("t[$tagIdx],")
                 else 
                    w.append("t[$tagIdx]")
                
            
            // 递归非叶子节点
            else 
                w.append(genWhereClause(exprMap, e))
            
        

        if (logic == LogicOperatorEnum.AND || logic == LogicOperatorEnum.OR) 
            w.append("))")
         else if (logic == LogicOperatorEnum.EXCEPT) 
            w.append("), x, NULL), t[$firstTagIdx]))")
        

        return w.toString()
    


    /**
     * 生成 arrayLines (最后一行没有: , 逗号)
    (select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
     */
    fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String 
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val size = tagIdxs.size
        val arrayLines = StringBuffer()

        tagIdxs.forEachIndexed  index, tagIdx ->
            if (index != size - 1) 
                arrayLines.append("($tagIdx.conditionExpr), \\n")
             else 
                arrayLines.append("($tagIdx.conditionExpr)  \\n")
            
        
        return arrayLines.toString()
    


    /**
    select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
    from (
    select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
    array(
    (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
    ) = """
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""


    /**
    select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
    from (
    select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
    array(
    (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
    INTO OUTFILE 'tos:///xxx' FORMAT CSV
    settings distributed_perfect_shard=1,max_execution_time = 600
     */
    private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
        csvFile: String,
    ) = """
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
INTO OUTFILE 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
"""





/**
tagIdxList=["conditionExpr":"select groupUniqArray(user_id) from db1.table1 where (  cate_id = '1001'  ) and (  f1     = '1'   )","index":1,"kexprId":684563482,"tagCode":"t1","tagOptionCode":"f1","conditionExpr":"select groupUniqArray(user_id) from db2.table2 where (  cate_id = '1002'  ) and (  f2     = '22'   )","index":2,"kexprId":684642314,"tagCode":"t2","tagOptionCode":"f2","conditionExpr":"select groupUniqArray(user_id) from db2.table2 where (  shop_id = '798322'  ) and (  f3     = 333   )","index":3,"kexprId":568144263,"tagCode":"t2","tagOptionCode":"f3","conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1004'  ) and (  f4     = '4'   )","index":4,"kexprId":684626037,"tagCode":"t3","tagOptionCode":"f4","conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1005'  ) and (  f5     = 5   )","index":5,"kexprId":684627036,"tagCode":"t3","tagOptionCode":"f5","conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1006'  ) and (  f6     = 6   )","index":6,"kexprId":684628027,"tagCode":"t3","tagOptionCode":"f6"]
(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select groupUniqArray(user_id) from db1.table1 where (  cate_id = '1001'  ) and (  f1     = '1'   )),
(select groupUniqArray(user_id) from db2.table2 where (  cate_id = '1002'  ) and (  f2     = '22'   )),
(select groupUniqArray(user_id) from db2.table2 where (  shop_id = '798322'  ) and (  f3     = 333   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1004'  ) and (  f4     = '4'   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1005'  ) and (  f5     = 5   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1006'  ) and (  f6     = 6   ))
 */
fun main() 
    val requestDTO = SQLQueryReqDTO()
    val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
    val expression = KunLunExpression()
    expression.logic = LogicOperatorEnum.EXCEPT
    val subExpressionList = arrayListOf<KunLunExpression>()
    val e1 = KunLunExpression()
    val e2 = KunLunExpression()
    val e3 = KunLunExpression()

    val dimList = listOf(
        FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
        FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
    )

    e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
    e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
    e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
    e3.logic = LogicOperatorEnum.AND

    val e3SubExpressionList = arrayListOf<KunLunExpression>()
    val e31 = KunLunExpression()
    val e32 = KunLunExpression()
    val e33 = KunLunExpression()
    e3SubExpressionList.add(e31)
    e3SubExpressionList.add(e32)
    e3SubExpressionList.add(e33)
    e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
    e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
    e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
    e3.subExpression = e3SubExpressionList

    subExpressionList.add(e1)
    subExpressionList.add(e2)
    subExpressionList.add(e3)
    expression.subExpression = subExpressionList
    requestDTO.expression = expression

    // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
    // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
    // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
    // KFieldMapping(KField srcField, KField dstField)

    tableMappingMap["t1"] = listOf(KTableMapping(
        "t1",
        "table1",
        "db1",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db1", "table1", "user_id"),
        listOf(
            KFieldMapping(
                KField("f1", "", KFieldValueType.STRING, ""), // srcField
                KField("f1", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t2"] = listOf(KTableMapping(
        "t2",
        "table2",
        "db2",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db2", "table2", "user_id"),
        listOf(
            KFieldMapping(
                KField("f2", "", KFieldValueType.STRING, ""), // srcField
                KField("f2", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f3", "", KFieldValueType.LONG, ""), // srcField
                KField("f3", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t3"] = listOf(KTableMapping(
        "t3",
        "table3",
        "db3",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db3", "table3", "user_id"),
        listOf(
            KFieldMapping(
                KField("f4", "", KFieldValueType.STRING, ""), // srcField
                KField("f4", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f5", "", KFieldValueType.LONG, ""), // srcField
                KField("f5", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("f6", "", KFieldValueType.LONG, ""), // srcField
                KField("f6", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    val WideTableMultiDimCHSQLParser = WideTableMultiDimCHSQLParser()
    val expr = WideTableMultiDimCHSQLParser.expr(requestDTO, tableMappingMap)
    val arrayLines = WideTableMultiDimCHSQLParser.arrayLines(requestDTO, tableMappingMap)

    println(expr)
    println(arrayLines)










/**
 * 宽表多维标签 HIVE SQL 解析器
 * @author chenguangjian.jk
 * @date 2022-03-09 02:28:48
 */
@Service
class WideTableMultiDimHiveSQLParser 

    val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
    @Resource
    lateinit var commonParseUtils: CommonParseUtils
    /**
     * 宽表多维标签预估 SQL
     */
    fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String 
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        // Parse KunLunExpression
        return WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap)
        )
    


    /**
     * 宽表多维标签圈选 SQL
     */
    fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String 
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
       
        val csvFile = ""
        // Parse KunLunExpression
        return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap),
            csvFile = csvFile,
        )
    


    fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String 
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val exprMap = tagIdxs.groupBy  it.kexprId 
        return genWhereClause(exprMap, requestDTO.expression)
    


    private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String 
        val subExpression = kunLunExpression.subExpression
        if (CollectionUtils.isEmpty(subExpression))  // 叶子节点
            return ""
        

        val w = StringBuffer()
        val size = subExpression.size
        val logic = kunLunExpression.logic

        w.append("(")

        if (logic == LogicOperatorEnum.AND) 
            w.append("array_intersect(")
         else if (logic == LogicOperatorEnum.OR) 
            w.append("array_union(")
         else if (logic == LogicOperatorEnum.EXCEPT) 
            w.append("array_except(")
         else 
            throw IllegalArgumentException("logic $logic not supported!")
        

        var firstTagIdx: Int = 1
        subExpression.forEachIndexed  index, e ->
            // 最叶子节点
            if (isLeafNode(e)) 
                val targetTagIdx = exprMap[e.tfId]?.get(0)
                val tagIdx = targetTagIdx!!.index

                // 计算差集使用
                if (index == 0) 
                    firstTagIdx = tagIdx
                

                if (index != size - 1) 
                    w.append("t[$tagIdx],")
                 else 
                    w.append("t[$tagIdx]")
                
            
            // 递归非叶子节点
            else 
                w.append(genWhereClause(exprMap, e))
            
        

        w.append("))")
        return w.toString()
    


    /**
     * 生成 arrayLines (最后一行没有: , 逗号)
    (select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
     */
    fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String 
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val size = tagIdxs.size
        val arrayLines = StringBuffer()

        tagIdxs.forEachIndexed  index, tagIdx ->
            if (index != size - 1) 
                arrayLines.append("($tagIdx.conditionExpr), \\n")
             else 
                arrayLines.append("($tagIdx.conditionExpr)  \\n")
            
        
        return arrayLines.toString()
    



    /**
    select  size(t.res) as cnt
    from (
    select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
    array(
    (select collect_set(UserID) from hits_v1 where Sex = 1),
    (select collect_set(UserID) from hits_v1 where Age > 18),
    (select collect_set(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
    ) = """
select size(t.res) as cnt
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""


    /**
    select explode(t.res) as ids
    from (
    select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
    array(
    (select collect_set(UserID) from hits_v1 where Sex = 1),
    (select collect_set(UserID) from hits_v1 where Age > 18),
    (select collect_set(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
        csvFile: String,
    ) = """
select explode(t.res) as ids
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""









/**
WideTableMultiDimCHSQLParser - tagIdxList=["conditionExpr":"select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )","index":1,"kexprId":-316732738,"tagCode":"t1","tagOptionCode":"f1","conditionExpr":"select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )","index":2,"kexprId":-316653905,"tagCode":"t2","tagOptionCode":"f2","conditionExpr":"select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )","index":3,"kexprId":-315132611,"tagCode":"t2","tagOptionCode":"f3","conditionExpr":"select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )","index":4,"kexprId":127438862,"tagCode":"t3","tagOptionCode":"f4","conditionExpr":"select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )","index":5,"kexprId":127439854,"tagCode":"t3","tagOptionCode":"f5","conditionExpr":"select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   )","index":6,"kexprId":-316668196,"tagCode":"t3","tagOptionCode":"f6"]
(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))
 */
fun main() 
    val requestDTO = SQLQueryReqDTO()
    val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
    val expression = KunLunExpression()
    expression.logic = LogicOperatorEnum.EXCEPT
    val subExpressionList = arrayListOf<KunLunExpression>()
    val e1 = KunLunExpression()
    val e2 = KunLunExpression()
    val e3 = KunLunExpression()

    val dimList = listOf(
        FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
        FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
    )

    e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
    e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
    e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
    e3.logic = LogicOperatorEnum.AND

    val e3SubExpressionList = arrayListOf<KunLunExpression>()
    val e31 = KunLunExpression()
    val e32 = KunLunExpression()
    val e33 = KunLunExpression()
    e3SubExpressionList.add(e31)
    e3SubExpressionList.add(e32)
    e3SubExpressionList.add(e33)
    e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
    e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
    e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
    e3.subExpression = e3SubExpressionList

    subExpressionList.add(e1)
    subExpressionList.add(e2)
    subExpressionList.add(e3)
    expression.subExpression = subExpressionList
    requestDTO.expression = expression

    // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
    // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
    // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
    // KFieldMapping(KField srcField, KField dstField)

    tableMappingMap["t1"] = listOf(KTableMapping(
        "t1",
        "table1",
        "db1",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db1", "table1", "user_id"),
        listOf(
            KFieldMapping(
                KField("f1", "", KFieldValueType.STRING, ""), // srcField
                KField("f1", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t2"] = listOf(KTableMapping(
        "t2",
        "table2",
        "db2",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db2", "table2", "user_id"),
        listOf(
            KFieldMapping(
                KField("f2", "", KFieldValueType.STRING, ""), // srcField
                KField("f2", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f3", "", KFieldValueType.LONG, ""), // srcField
                KField("f3", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t3"] = listOf(KTableMapping(
        "t3",
        "table3",
        "db3",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db3", "table3", "user_id"),
        listOf(
            KFieldMapping(
                KField("f4", "", KFieldValueType.STRING, ""), // srcField
                KField("f4", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f5", "", KFieldValueType.LONG, ""), // srcField
                KField("f5", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("f6", "", KFieldValueType.LONG, ""), // srcField
                KField("f6", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    val WideTableMultiDimHiveSQLParser = WideTableMultiDimHiveSQLParser()
    val expr = WideTableMultiDimHiveSQLParser.expr(requestDTO, tableMappingMap)
    val arrayLines = WideTableMultiDimHiveSQLParser.arrayLines(requestDTO, tableMappingMap)

    println(expr)
    println(arrayLines)

以上是关于WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算的主要内容,如果未能解决你的问题,请参考以下文章