Hive学习窗口函数源码阅读

Posted 假如我有一口缸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive学习窗口函数源码阅读相关的知识,希望对你有一定的参考价值。

Hive学习(一)窗口函数源码阅读

背景

最近计算的指标经常使用到窗口函数,由于数据量级较大,窗口函数执行的耗时较长,想要优化却无从下手,才发觉对窗口函数底层实现原理一无所知,故计划阅读hive窗口函数实现并做记录,如有错误欢迎指正。

注意:本篇不讨论streaming流模式;源码文件再hive的ql模块下;本篇提到的分区均值窗口函数的分区,而不是hive物理分区

窗口函数执行逻辑

第一步:将数据按照窗口定义分割成多个分区(partition)
第二步:对各个分区上的数据调用窗口函数
和不同的UDAF不同,窗口函数的返回结果不是一个聚合值,而是另一张表的格式(table-in, table-out),所以Hive社区引入了分区表函数Partitioned Table Function(PTF)。

代码流转图

源码阅读分析

PTFOperator

PartitionedTableFunction的运算符,继承Operator(Hive运算符基类)
重写process(Object row, int tag) 方法,使用该方法来处理一行数据Row,

@Override
    public void process(Object row, int tag) throws HiveException 
        if (!isMapOperator) 
            /*
             * check if current row belongs to the current accumulated Partition:
             * - If not:
             *  - process the current Partition
             *  - reset input Partition
             * - set currentKey to the newKey if it is null or has changed.
             */
            newKeys.getNewKey(row, inputObjInspectors[0]);
            // 方法中会判断当前row所属的Key(newKeys)是否等于当前正在累积数据的partition所属的key(currentKeys)
            boolean keysAreEqual = (currentKeys != null && newKeys != null) ?
                    newKeys.equals(currentKeys) : false;
            // 如果不相等,就结束当前partition分区的数据累积,触发窗口计算
            if (currentKeys != null && !keysAreEqual) 
                // 关闭正在积累的分区
                ptfInvocation.finishPartition();
            
            // 如果currentKeys为空或者被改变,就将newKeys赋值给currentKeys
            if (currentKeys == null || !keysAreEqual) 
                // 开启一个新的分区partition
                ptfInvocation.startPartition();
                if (currentKeys == null) 
                    currentKeys = newKeys.copyKey();
                 else 
                    currentKeys.copyKey(newKeys);
                
            
         else if (firstMapRow)  // 说明当前row是进入的第一行
            ptfInvocation.startPartition();
            firstMapRow = false;
        
        // 将数据row添加到分区中,积累数据
        ptfInvocation.processRow(row);
    

上面的代码可以看出,所有数据应该是按照分区排好了序,排队进入process方法,当遇到进入的row和当前分区不是同一个key时,当前分区就可以关闭了,然后在打开下一个分区。
打开和关闭分区使用的方法分别是 ptfInvocation.startPartition()ptfInvocation.finishPartition()

PTFInvocation

PTFInvocationPTFOperator的内部类,在PTFOperator的初始化方法中创建了实例。

@Override
  protected void initializeOp(Configuration jobConf) throws HiveException 
    ...
    ptfInvocation = setupChain();
    ptfInvocation.initializeStreaming(jobConf, isMapOperator);
    ...
  

主要作用是负责PTF数据链中行的流动。通过processRow方法调用传递链中的每一行,并且通过startPartition() 和 *finishPartition()*方法来通知分区何时开始何时结束。
类中包含TableFunction,用来处理分区数据。

PTFPartition inputPart; // 分区对象,一直是在复用一个inputPart
TableFunctionEvaluator tabFn; // 可以理解问窗口函数的实例

//向分区中添加一行数据
void processRow(Object row) throws HiveException 
    if (isStreaming()) 
        handleOutputRows(tabFn.processRow(row));
     else 
        // inputPart就是当前正在累积数据的分区
        inputPart.append(row);
    


// 开启一个分区
void startPartition() throws HiveException 
    if (isStreaming()) 
        tabFn.startPartition();
     else 
        if (prev == null || prev.isOutputIterator()) 
            if (inputPart == null) 
                // 创建新分区:PTFPartition对象
                createInputPartition();
             else 
                // 重置分区
                inputPart.reset();
            
        
    
    if (next != null) 
        next.startPartition();
    


// 关闭一个分区
void finishPartition() throws HiveException 
    if (isStreaming()) 
        handleOutputRows(tabFn.finishPartition());
     else 
        if (tabFn.canIterateOutput()) 
            outputPartRowsItr = inputPart == null ? null :
                    tabFn.iterator(inputPart.iterator());
         else 
            // tabFn是窗口函数的实例,execute方法将inputPart输入,执行窗口函数逻辑的计算,返回outputPart依旧是一个分区对象
            outputPart = inputPart == null ? null : tabFn.execute(inputPart);
            outputPartRowsItr = outputPart == null ? null : outputPart.iterator();
        
        if (next != null) 
            if (!next.isStreaming() && !isOutputIterator()) 
                next.inputPart = outputPart;
             else 
                if (outputPartRowsItr != null) 
                    while (outputPartRowsItr.hasNext()) 
                        next.processRow(outputPartRowsItr.next());
                    
                
            
        

    if (next != null) 
        next.finishPartition();
     else 
        if (!isStreaming()) 
            if (outputPartRowsItr != null) 
                while (outputPartRowsItr.hasNext()) 
                    // 将窗口函数计算结果逐条输出到下一个Operator中
                    forward(outputPartRowsItr.next(), outputObjInspector);
                
            
        
    

接下来看一下PTFPartitionTableFunctionEvaluator

PTFPartition

该类表示由TableFunctionWindowFunction来处理的行集合,使用PTFRowContainer来保存数据。

private final PTFRowContainer<List<Object>> elems; // 存放数据的容器

public void append(Object o) throws HiveException 
    if (elems.rowCount() == Integer.MAX_VALUE) 
        throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",
                Integer.MAX_VALUE));
    

    @SuppressWarnings("unchecked")
    List<Object> l = (List<Object>)
            ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);
    elems.addRow(l);

在往PTFPartition中添加数据时,如果当前累计条数超过了Int最大值,会抛异常。

TableFunctionEvaluator

该类负责对分区内的数据做实际的窗口计算

transient protected PTFPartition outputPartition; // transient瞬态变量,该属性可以不参与序列化

public PTFPartition execute(PTFPartition iPart)
        throws HiveException 
    if (ptfDesc.isMapSide()) 
        return transformRawInput(iPart);
    
    PTFPartitionIterator<Object> pItr = iPart.iterator();
    PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc.getLlInfo(), pItr);

    if (outputPartition == null) 
        outputPartition = PTFPartition.create(ptfDesc.getCfg(),
                tableDef.getOutputShape().getSerde(),
                OI, tableDef.getOutputShape().getOI());
     else 
        outputPartition.reset();
    
	// 入参1:输入PTFPartition转换的迭代器;入参2:输出PTFPartition
    execute(pItr, outputPartition);
    return outputPartition;


protected abstract void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart) throws HiveException;

execute(PTFPartitionIterator pItr, PTFPartition oPart) 方法的具体实现在子类WindowingTableFunction

@Override
public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException 
    ArrayList<List<?>> oColumns = new ArrayList<List<?>>();
    PTFPartition iPart = pItr.getPartition();
    StructObjectInspector inputOI = iPart.getOutputOI();

    WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
    for (WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) 
    	// 这里是判断逻辑:如果该窗口定义是一个从第一行到最后一行的全局无限窗口就返回false,反之true
        boolean processWindow = processWindow(wFn.getWindowFrame());
        pItr.reset();
        if (!processWindow) 
            Object out = evaluateFunctionOnPartition(wFn, iPart);
            if (!wFn.isPivotResult()) 
                out = new SameList(iPart.size(), out);
            
            oColumns.add((List<?>) out);
         else 
            oColumns.add(executeFnwithWindow(wFn, iPart));
        
    

    /*
     * Output Columns in the following order
     * - the columns representing the output from Window Fns
     * - the input Rows columns
     */
    for (int i = 0; i < iPart.size(); i++) 
        ArrayList oRow = new ArrayList();
        Object iRow = iPart.getAt(i);

        for (int j = 0; j < oColumns.size(); j++) 
            oRow.add(oColumns.get(j).get(i));
        
        for (StructField f : inputOI.getAllStructFieldRefs()) 
            oRow.add(inputOI.getStructFieldData(iRow, f));
        
        //最终将处理好的数据逐条添加到输出PTFPartition中
        outP.append(oRow);
    


// Evaluate the function result for each row in the partition
ArrayList<Object> executeFnwithWindow(
        WindowFunctionDef wFnDef,
        PTFPartition iPart)
        throws HiveException 
    ArrayList<Object> vals = new ArrayList<Object>();
    for (int i = 0; i < iPart.size(); i++) 
    	// 入参:1.窗口函数、2.当前行的行号、3.输入PTFPartition对象
        Object out = evaluateWindowFunction(wFnDef, i, iPart);
        vals.add(out);
    
    return vals;


// Evaluate the result given a partition and the row number to process
private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)
        throws HiveException 
    BasePartitionEvaluator partitionEval = wFn.getWFnEval()
            .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI(), nullsLast);
    // 给定当前行,获取窗口的聚合
    return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo());

rows between窗口范围定义

上述代码中有一个判断boolean processWindow = processWindow(wFn.getWindowFrame()),我是这样理解的:
sql:sum(amount) over(partition by userid order by time) ,表示从第一行直到当前行的有范围窗口,返回true。
sql:sum(amount) over(partition by id order by time rows between current row and unbounded following) ,代表从当前行直到最后一行的有范围窗口,返回true。
sql:sum(amount) over(partition by id order by time rows between unbounded preceding and unbounded following),表示从第一行直到最后一行的无限范围窗口,返回false。

结尾

窗口函数先看到这里,虽然很浅显,但至少打开了hive源码阅读并记录的大门,接下来会继续学习hive和spark的源码。

参考文章:
https://mp.weixin.qq.com/s/WBryrbpHGO9jmzMp0e7jhw
https://blog.csdn.net/qq_45935878/article/details/122125477

以上是关于Hive学习窗口函数源码阅读的主要内容,如果未能解决你的问题,请参考以下文章

Hive学习之路 (十五)Hive分析窗口函数 CUME_DIST和PERCENT_RANK

Hive metastore源码阅读

我为什么学习hive窗口分析函数

hive关于窗口函数的使用

hive窗口函数总结

数据仓库工具Hive——窗口函数,DML,事务