Hive学习窗口函数源码阅读
Posted 假如我有一口缸
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive学习窗口函数源码阅读相关的知识,希望对你有一定的参考价值。
Hive学习(一)窗口函数源码阅读
背景
最近计算的指标经常使用到窗口函数,由于数据量级较大,窗口函数执行的耗时较长,想要优化却无从下手,才发觉对窗口函数底层实现原理一无所知,故计划阅读hive窗口函数实现并做记录,如有错误欢迎指正。
注意:本篇不讨论streaming流模式;源码文件再hive的ql模块下;本篇提到的分区均值窗口函数的分区,而不是hive物理分区
窗口函数执行逻辑
第一步:将数据按照窗口定义分割成多个分区(partition)
第二步:对各个分区上的数据调用窗口函数
和不同的UDAF不同,窗口函数的返回结果不是一个聚合值,而是另一张表的格式(table-in, table-out),所以Hive社区引入了分区表函数Partitioned Table Function(PTF)。
代码流转图
源码阅读分析
PTFOperator
PartitionedTableFunction的运算符,继承Operator(Hive运算符基类)
重写process(Object row, int tag) 方法,使用该方法来处理一行数据Row,
@Override
public void process(Object row, int tag) throws HiveException
if (!isMapOperator)
/*
* check if current row belongs to the current accumulated Partition:
* - If not:
* - process the current Partition
* - reset input Partition
* - set currentKey to the newKey if it is null or has changed.
*/
newKeys.getNewKey(row, inputObjInspectors[0]);
// 方法中会判断当前row所属的Key(newKeys)是否等于当前正在累积数据的partition所属的key(currentKeys)
boolean keysAreEqual = (currentKeys != null && newKeys != null) ?
newKeys.equals(currentKeys) : false;
// 如果不相等,就结束当前partition分区的数据累积,触发窗口计算
if (currentKeys != null && !keysAreEqual)
// 关闭正在积累的分区
ptfInvocation.finishPartition();
// 如果currentKeys为空或者被改变,就将newKeys赋值给currentKeys
if (currentKeys == null || !keysAreEqual)
// 开启一个新的分区partition
ptfInvocation.startPartition();
if (currentKeys == null)
currentKeys = newKeys.copyKey();
else
currentKeys.copyKey(newKeys);
else if (firstMapRow) // 说明当前row是进入的第一行
ptfInvocation.startPartition();
firstMapRow = false;
// 将数据row添加到分区中,积累数据
ptfInvocation.processRow(row);
上面的代码可以看出,所有数据应该是按照分区排好了序,排队进入process方法,当遇到进入的row和当前分区不是同一个key时,当前分区就可以关闭了,然后在打开下一个分区。
打开和关闭分区使用的方法分别是 ptfInvocation.startPartition() 和 ptfInvocation.finishPartition()。
PTFInvocation
PTFInvocation是PTFOperator的内部类,在PTFOperator的初始化方法中创建了实例。
@Override
protected void initializeOp(Configuration jobConf) throws HiveException
...
ptfInvocation = setupChain();
ptfInvocation.initializeStreaming(jobConf, isMapOperator);
...
主要作用是负责PTF数据链中行的流动。通过processRow方法调用传递链中的每一行,并且通过startPartition() 和 *finishPartition()*方法来通知分区何时开始何时结束。
类中包含TableFunction,用来处理分区数据。
PTFPartition inputPart; // 分区对象,一直是在复用一个inputPart
TableFunctionEvaluator tabFn; // 可以理解问窗口函数的实例
//向分区中添加一行数据
void processRow(Object row) throws HiveException
if (isStreaming())
handleOutputRows(tabFn.processRow(row));
else
// inputPart就是当前正在累积数据的分区
inputPart.append(row);
// 开启一个分区
void startPartition() throws HiveException
if (isStreaming())
tabFn.startPartition();
else
if (prev == null || prev.isOutputIterator())
if (inputPart == null)
// 创建新分区:PTFPartition对象
createInputPartition();
else
// 重置分区
inputPart.reset();
if (next != null)
next.startPartition();
// 关闭一个分区
void finishPartition() throws HiveException
if (isStreaming())
handleOutputRows(tabFn.finishPartition());
else
if (tabFn.canIterateOutput())
outputPartRowsItr = inputPart == null ? null :
tabFn.iterator(inputPart.iterator());
else
// tabFn是窗口函数的实例,execute方法将inputPart输入,执行窗口函数逻辑的计算,返回outputPart依旧是一个分区对象
outputPart = inputPart == null ? null : tabFn.execute(inputPart);
outputPartRowsItr = outputPart == null ? null : outputPart.iterator();
if (next != null)
if (!next.isStreaming() && !isOutputIterator())
next.inputPart = outputPart;
else
if (outputPartRowsItr != null)
while (outputPartRowsItr.hasNext())
next.processRow(outputPartRowsItr.next());
if (next != null)
next.finishPartition();
else
if (!isStreaming())
if (outputPartRowsItr != null)
while (outputPartRowsItr.hasNext())
// 将窗口函数计算结果逐条输出到下一个Operator中
forward(outputPartRowsItr.next(), outputObjInspector);
接下来看一下PTFPartition和TableFunctionEvaluator。
PTFPartition
该类表示由TableFunction或WindowFunction来处理的行集合,使用PTFRowContainer来保存数据。
private final PTFRowContainer<List<Object>> elems; // 存放数据的容器
public void append(Object o) throws HiveException
if (elems.rowCount() == Integer.MAX_VALUE)
throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",
Integer.MAX_VALUE));
@SuppressWarnings("unchecked")
List<Object> l = (List<Object>)
ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);
elems.addRow(l);
在往PTFPartition中添加数据时,如果当前累计条数超过了Int最大值,会抛异常。
TableFunctionEvaluator
该类负责对分区内的数据做实际的窗口计算
transient protected PTFPartition outputPartition; // transient瞬态变量,该属性可以不参与序列化
public PTFPartition execute(PTFPartition iPart)
throws HiveException
if (ptfDesc.isMapSide())
return transformRawInput(iPart);
PTFPartitionIterator<Object> pItr = iPart.iterator();
PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc.getLlInfo(), pItr);
if (outputPartition == null)
outputPartition = PTFPartition.create(ptfDesc.getCfg(),
tableDef.getOutputShape().getSerde(),
OI, tableDef.getOutputShape().getOI());
else
outputPartition.reset();
// 入参1:输入PTFPartition转换的迭代器;入参2:输出PTFPartition
execute(pItr, outputPartition);
return outputPartition;
protected abstract void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart) throws HiveException;
execute(PTFPartitionIterator pItr, PTFPartition oPart) 方法的具体实现在子类WindowingTableFunction中
@Override
public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException
ArrayList<List<?>> oColumns = new ArrayList<List<?>>();
PTFPartition iPart = pItr.getPartition();
StructObjectInspector inputOI = iPart.getOutputOI();
WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
for (WindowFunctionDef wFn : wTFnDef.getWindowFunctions())
// 这里是判断逻辑:如果该窗口定义是一个从第一行到最后一行的全局无限窗口就返回false,反之true
boolean processWindow = processWindow(wFn.getWindowFrame());
pItr.reset();
if (!processWindow)
Object out = evaluateFunctionOnPartition(wFn, iPart);
if (!wFn.isPivotResult())
out = new SameList(iPart.size(), out);
oColumns.add((List<?>) out);
else
oColumns.add(executeFnwithWindow(wFn, iPart));
/*
* Output Columns in the following order
* - the columns representing the output from Window Fns
* - the input Rows columns
*/
for (int i = 0; i < iPart.size(); i++)
ArrayList oRow = new ArrayList();
Object iRow = iPart.getAt(i);
for (int j = 0; j < oColumns.size(); j++)
oRow.add(oColumns.get(j).get(i));
for (StructField f : inputOI.getAllStructFieldRefs())
oRow.add(inputOI.getStructFieldData(iRow, f));
//最终将处理好的数据逐条添加到输出PTFPartition中
outP.append(oRow);
// Evaluate the function result for each row in the partition
ArrayList<Object> executeFnwithWindow(
WindowFunctionDef wFnDef,
PTFPartition iPart)
throws HiveException
ArrayList<Object> vals = new ArrayList<Object>();
for (int i = 0; i < iPart.size(); i++)
// 入参:1.窗口函数、2.当前行的行号、3.输入PTFPartition对象
Object out = evaluateWindowFunction(wFnDef, i, iPart);
vals.add(out);
return vals;
// Evaluate the result given a partition and the row number to process
private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)
throws HiveException
BasePartitionEvaluator partitionEval = wFn.getWFnEval()
.getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI(), nullsLast);
// 给定当前行,获取窗口的聚合
return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo());
rows between窗口范围定义
上述代码中有一个判断boolean processWindow = processWindow(wFn.getWindowFrame()),我是这样理解的:
sql:sum(amount) over(partition by userid order by time) ,表示从第一行直到当前行的有范围窗口,返回true。
sql:sum(amount) over(partition by id order by time rows between current row and unbounded following) ,代表从当前行直到最后一行的有范围窗口,返回true。
sql:sum(amount) over(partition by id order by time rows between unbounded preceding and unbounded following),表示从第一行直到最后一行的无限范围窗口,返回false。
结尾
窗口函数先看到这里,虽然很浅显,但至少打开了hive源码阅读并记录的大门,接下来会继续学习hive和spark的源码。
参考文章:
https://mp.weixin.qq.com/s/WBryrbpHGO9jmzMp0e7jhw
https://blog.csdn.net/qq_45935878/article/details/122125477
以上是关于Hive学习窗口函数源码阅读的主要内容,如果未能解决你的问题,请参考以下文章