Postgres数据库之聚集函数内核源码学习总结

Posted 丶Summer ~Z

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Postgres数据库之聚集函数内核源码学习总结相关的知识,希望对你有一定的参考价值。

学习参考书籍、网站或博文:

  1. 参考书籍:《PostgreSQL数据库内核分析》
  2. Lex & Yacc 点击前往
  3. Postgresql源码学习之词法和语法分析 点击前往

Postgres数据库之聚集函数内核源码学习总结


聚集函数概述

PostgreSQL支持聚集函数。 一个聚集函数从多个输入行中计算出一个结果。 比如,我们有在一个行集合上计算count(计数)、sum(和)、avg(均值)、max(最大值)和min(最小值)的函数。
PostgreSQL中的聚集函数用状态值状态转换函数定义。也就是,一个聚集操作使用一个状态值,它在每一个后续输入行被处理时被更新。要定义一个新的聚集函数,我们要为状态值选择一种数据类型、一个状态的初始值和一个状态转换函数。状态转换函数接收前一个状态值和该聚集当前行的输入值,并且返回一个新的状态值。万一该聚集的预期结果与需要保存在运行状态之中的数据不同,还能指定一个最终函数。最终函数接收结束状态值并且返回作为聚集结果的任何东西。原则上,转换函数最终函数只是也可以在聚集环境之外使用的普通函数(实际上,通常出于性能的原因,会创建特殊的只能作为聚集的一部分工作的转换函数)。

因此,除了该聚集的用户所见的参数和结果数据类型之外,还有一种可能不同于参数和结果状态的内部状态值数据类型。
自定义聚集函数有对应的SQL语法Postgresql定义聚集函数 点击前往,本次学习重点在内核源码是如何定义聚集函数,即系统聚集函数的定义及处理流程。

相关系统目录介绍

函数的相关信息都保存在pg_proc系统目录中(pg_proc系统目录介绍 点击前往),可以通过查看该系统表可知PostgreSQL数据库中的聚集函数,该系统表的字段prokind代表函数类型,其中f表示普通函数,p表示过程,a表示聚集函数,w表示窗口函数,如下以avg函数举例

postgres=# select * from pg_proc where prokind = 'a' and proname = 'avg';
 oid  | proname | pronamespace | proowner | prolang | procost | prorows | provariadic | prosupport | prokind | prosecdef | proleakproof | proisstrict | proretset | provolatile | proparallel | pronargs | pronargdefaults | prorettype | proargtypes | proallargtypes | proargmodes | proargnames | proargdefaults | protrftypes |     prosrc      | probin | proconfig | proacl 
------+---------+--------------+----------+---------+---------+---------+-------------+------------+---------+-----------+--------------+-------------+-----------+-------------+-------------+----------+-----------------+------------+-------------+----------------+-------------+-------------+----------------+-------------+-----------------+--------+-----------+--------
 2100 | avg     |           11 |       10 |      12 |       1 |       0 |           0 | -          | a       | f         | f            | f           | f         | i           | s           |        1 |               0 |       1700 | 20          |                |             |             |                |             | aggregate_dummy |        |           | 
 2101 | avg     |           11 |       10 |      12 |       1 |       0 |           0 | -          | a       | f         | f            | f           | f         | i           | s           |        1 |               0 |       1700 | 23          |                |             |             |                |             | aggregate_dummy |        |           | 
 2102 | avg     |           11 |       10 |      12 |       1 |       0 |           0 | -          | a       | f         | f            | f           | f         | i           | s           |        1 |               0 |       1700 | 21          |                |             |             |                |             | aggregate_dummy |        |           | 
 2103 | avg     |           11 |       10 |      12 |       1 |       0 |           0 | -          | a       | f         | f            | f           | f         | i           | s           |        1 |               0 |       1700 | 1700        |                |             |             |                |             | aggregate_dummy |        |           | 
 2104 | avg     |           11 |       10 |      12 |       1 |       0 |           0 | -          | a       | f         | f            | f           | f         | i           | s           |        1 |               0 |        701 | 700         |                |             |             |                |             | aggregate_dummy |        |           | 
 2105 | avg     |           11 |       10 |      12 |       1 |       0 |           0 | -          | a       | f         | f            | f           | f         | i           | s           |        1 |               0 |        701 | 701         |                |             |             |                |             | aggregate_dummy |        |           | 
 2106 | avg     |           11 |       10 |      12 |       1 |       0 |           0 | -          | a       | f         | f            | f           | f         | i           | s           |        1 |               0 |       1186 | 1186        |                |             |             |                |             | aggregate_dummy |        |           | 
(7 rows)

postgres=#

如上图所示,聚集函数avgpg_proc中的相关内容如上,一共7行,对应了avg函数不同的入参类型和返回值类型,其中prosrc都是aggregate_dummy,实际上该函数并非avg函数对应的底层处理函数,而真正的聚集函数相关的处理函数保存在pg_aggregate系统目录下

名称类型引用描述
aggfnoidregprocpg_proc.oid聚集函数在pg_proc中的OID
aggkindchar聚集类型: n表示“普通”聚集, o表示“有序集”聚集,或者 h表示“假想集”聚集
aggnumdirectargsint2一个有序集或者假想集聚集的直接(非聚集)参数的数量,一个可变数组算作一个参数。 如果等于pronargs,该聚集必定是可变的并且该可变数组描述聚集参数以 及最终直接参数。对于普通聚集总是为零。
aggtransfnregprocpg_proc.oid转移函数
aggfinalfnregprocpg_proc.oid最终函数(如果没有就为零)
aggcombinefnregprocpg_proc.oid结合函数(如果没有就为零)
aggserialfnregprocpg_proc.oid序列化函数(如果没有就为零)
aggdeserialfnregprocpg_proc.oid反序列化函数(如果没有就为零)
aggmtransfnregprocpg_proc.oid用于移动聚集模式的向前转移函数(如果没有就为零)
aggminvtransfnregprocpg_proc.oid用于移动聚集模式的反向转移函数(如果没有就为零)
aggmfinalfnregprocpg_proc.oid用于移动聚集模式的最终函数(如果没有就为零)
aggfinalextrabool为真则向 aggfinalfn传递额外的哑参数
aggmfinalextrabool为真则向 aggmfinalfn传递额外的哑参数
aggfinalmodifycharaggfinalfn是否修改传递状态值: 如果是只读则为r, 如果不能在aggfinalfn之后应用aggtransfn则为s, 如果它修改该值则为w
aggmfinalmodifychar和aggfinalmodify类似,但是用于aggmfinalfn
aggsortopoidpg_operator.oid相关联的排序操作符(如果没有则为0)
aggtranstypeoidpg_type.oid聚集函数的内部转移(状态)数据的数据类型
aggtransspaceint4转移状态数据的近似平均尺寸(字节),或者为零表示使用一个默认估算值
aggmtranstypeoidpg_type.oid聚集函数用于移动聚集欧氏的内部转移(状态)数据的数据类型(如果没有则为零)
aggmtransspaceint4转移状态数据的近似平均尺寸(字节),或者为零表示使用一个默认估算值
agginitvaltext转移状态的初始值。这是一个文本域,它包含初始值的外部字符串表现形式。如果这个域为空,则转移状态值从空值开始。
aggminitvaltext用于移动聚集模式的转移状态初值。这是一个文本域,它包含了以其文本字符串形式表达的初值。 如果这个域为空,则转移状态值从空值开始。

依旧以avg函数为例,通过查看系统表pg_aggregate可以看到该聚集函数对应的相关处理函数,如转换函数,最终函数,结合函数

postgres=# select * from pg_aggregate where aggfnoid = 2100;
    aggfnoid    | aggkind | aggnumdirectargs |   aggtransfn   |    aggfinalfn    |   aggcombinefn   |    aggserialfn     |    aggdeserialfn     |  aggmtransfn   |   aggminvtransfn   |   aggmfinalfn    | aggfinalextra | aggmfinalextra | aggfinalmodify | aggmfinalmodify | aggsortop | aggtranstype | aggtransspace | aggmtranstype | aggmtransspace | agginitval | aggminitval 
----------------+---------+------------------+----------------+------------------+------------------+--------------------+----------------------+----------------+--------------------+------------------+---------------+----------------+----------------+-----------------+-----------+--------------+---------------+---------------+----------------+------------+-------------
 pg_catalog.avg | n       |                0 | int8_avg_accum | numeric_poly_avg | int8_avg_combine | int8_avg_serialize | int8_avg_deserialize | int8_avg_accum | int8_avg_accum_inv | numeric_poly_avg | f             | f              | r              | r               |         0 |         2281 |            48 |          2281 |             48 |            | 
(1 row)

postgres=# select * from pg_aggregate where aggfnoid = 2101;
    aggfnoid    | aggkind | aggnumdirectargs |   aggtransfn   | aggfinalfn |   aggcombinefn   | aggserialfn | aggdeserialfn |  aggmtransfn   |   aggminvtransfn   | aggmfinalfn | aggfinalextra | aggmfinalextra | aggfinalmodify | aggmfinalmodify | aggsortop | aggtranstype | aggtransspace | aggmtranstype | aggmtransspace | agginitval | aggminitval 
----------------+---------+------------------+----------------+------------+------------------+-------------+---------------+----------------+--------------------+-------------+---------------+----------------+----------------+-----------------+-----------+--------------+---------------+---------------+----------------+------------+-------------
 pg_catalog.avg | n       |                0 | int4_avg_accum | int8_avg   | int4_avg_combine | -           | -             | int4_avg_accum | int4_avg_accum_inv | int8_avg    | f             | f              | r              | r      
         |         0 |         1016 |             0 |          1016 |              0 | 0,0      | 0,0
(1 row)

postgres=# select * from pg_aggregate where aggfnoid = 2102;
    aggfnoid    | aggkind | aggnumdirectargs |   aggtransfn   | aggfinalfn |   aggcombinefn   | aggserialfn | aggdeserialfn |  aggmtransfn   |   aggminvtransfn   | aggmfinalfn | aggfinalextra | aggmfinalextra | aggfinalmodify | aggmfinalmodify | aggsortop | aggtranstype | aggtransspace | aggmtranstype | aggmtransspace | agginitval | aggminitval 
----------------+---------+------------------+----------------+------------+------------------+-------------+---------------+----------------+--------------------+-------------+---------------+----------------+----------------+-----------------+-----------+--------------+---------------+---------------+----------------+------------+-------------
 pg_catalog.avg | n       |                0 | int2_avg_accum | int8_avg   | int4_avg_combine | -           | -             | int2_avg_accum | int2_avg_accum_inv | int8_avg    | f             | f              | r              | r      
         |         0 |         1016 |             0 |          1016 |              0 | 0,0      | 0,0
(1 row)

postgres=# 

一个简单的聚集函数由一个或者多个普通函数组成: 一个状态转换函数aggtransfn和一个可选的最终计算函数aggfinalfn
aggfnoid2100对应在pg_proc表中的第一条,函数入参为int8,返回值为numeric,其对应的状态转换函数为int8_avg_accum,最终计算函数为numeric_poly_avg,这两个函数即该聚集函数在内核中的相关处理函数。
PostgreSQL创建一个数据类型 stype的临时变量来保持聚集的当前内部状态。对每一个输入行,聚集参数值会被计算并且状态转换函数会被调用,它用当前状态值和新参数值计算一个新的内部状态值。 等所有行都被处理完后,最终函数会被调用一次来计算该聚集的返回值。如果没有最终函数,则最终的状态值会被返回。状态转换函数和最终函数的相关信息也保存在pg_proc系统表中

#该函数的入参类型为2281 20,其中参数1:2281对应的类型为internal,即上一次转换函数的返回值
#                           参数2:20 对应类型为int8,即当前行输入行的数据类型
#                       函数返回值:2281对应internal
postgres=# select * from pg_proc where proname = 'int8_avg_accum';
 oid  |    proname     | pronamespace | proowner | prolang | procost | prorows | provariadic | prosupport | prokind | prosecdef | proleakproof | proisstrict | proretset | provolatile | proparallel | pronargs | pronargdefaults | prorettype | proargtypes | proallargtypes | proargmodes | proargnames | proargdefaults | protrftypes |     prosrc     | probin | proconfig | proacl 
------+----------------+--------------+----------+---------+---------+---------+-------------+------------+---------+-----------+--------------+-------------+-----------+-------------+-------------+----------+-----------------+------------+-------------+----------------+-------------+-------------+----------------+-------------+----------------+--------+-----------+--------
 2746 | int8_avg_accum |           11 |       10 |      12 |       1 |       0 |           0 | -          | f       | f         | f            | f           | f         | i           | s           |        2 |               0 |       2281 | 2281 20     |                |             |             |                |             | int8_avg_accum |        |           | 
(1 row)

#该函数的入参类型为2281 对应类型为internal,即经过上述转换函数转换之后的返回值
# 函数返回值:1700 对应类型为numeric,即根据转换后的状态值进行最终的计算
postgres=# select * from pg_proc where proname = 'numeric_poly_avg';
 oid  |     proname      | pronamespace | proowner | prolang | procost | prorows | provariadic | prosupport | prokind | prosecdef | proleakproof | proisstrict | proretset | provolatile | proparallel | pronargs | pronargdefaults | prorettype | proargtypes | proallargtypes | proargmodes | proargnames | proargdefaults | protrftypes |      prosrc      | probin | proconfig | proacl 
------+------------------+--------------+----------+---------+---------+---------+-------------+------------+---------+-----------+--------------+-------------+-----------+-------------+-------------+----------+-----------------+------------+-------------+----------------+-------------+-------------+----------------+-------------+------------------+--------+-----------+--------
 3389 | numeric_poly_avg |           11 |       10 |      12 |       1 |       0 |           0 | -          | f       | f         | f            | f           | f         | i           | s           |        1 |               0 |      
 1700 | 2281        |                |             |             |                |             | numeric_poly_avg |        |           | 
(1 row)

postgres=# 

对于avg函数,计算平均值,对于输入值,我们最终需要在临时状态中保存当前分组的和以及当前分组对于的总处理数个数,内核中在转换时定义了一个新的数据类型的临时变量来保存聚集的当前内部状态值,该类型对应结构如下:

typedef struct Int128AggState

	bool		calcSumX2;		/* 如果为true,则计算平方和 */
	int64		N;				/* 已处理数的计数 */
	int128		sumX;			/* 已处理数之和 */
	int128		sumX2;			/* 已处理数的平方和 */
 Int128AggState;
//src/backend/utils/adt/numeric.c
/*
 * Transition function for int8 input when we don't need sumX2.
 */
Datum
int8_avg_accum(PG_FUNCTION_ARGS)

	PolyNumAggState *state; //临时变量,用于保存转换状态值的新类型

	/* arg[0]对应当前状态值,如果为NULL,说明第一次调用该状态转换函数 */
	state = PG_ARGISNULL(0) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(0);

	/* 在第一次调用时创建状态数据 */
	if (state == NULL)
		state = makePolyNumAggState(fcinfo, false);

	/* 处理arg[1],该参数为当前输入行对应的参数值 */
	if (!PG_ARGISNULL(1))
	
#ifdef HAVE_INT128
		do_int128_accum(state, (int128) PG_GETARG_INT64(1));
#else
		Numeric		newval;

		/* 如果没有定义HAVE_INT128宏,则将该参数值转换为Numeric类型,再进行求和计算 */
		newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric,
													 PG_GETARG_DATUM(1)));
		do_numeric_accum(state, newval); //求和
#endif
	

	PG_RETURN_POINTER(state);


/* 用于计算平均值的最终函数 */
Datum
numeric_poly_avg(PG_FUNCTION_ARGS)

#ifdef HAVE_INT128
	PolyNumAggState *state;
	NumericVar	result;
	Datum		countd,
				sumd;

	state = PG_ARGISNULL(0) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(0);

	/* If there were no non-null inputs, return NULL */
	if (state == NULL || state->N == 0)
		PG_RETURN_NULL();

	init_var(&result);

	int128_to_numericvar(state->sumX, &result);

	/* 将状态值中保存的处理数总个数N转换为NUMERIC类型 */
	countd = DirectFunctionCall1(int8_numeric,
								 Int64GetDatumFast(state->N));
	/* 状态值中保存的处理数总和 */
	sumd = NumericGetDatum(make_result(&result));

	free_var(&result);

	/* 计算平均值 */
	PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
#else
	return numeric_avg(fcinfo);
#endif

源码相关函数简介

通过上面的介绍,我们已经了解关于聚集函数相关的底层处理函数如何去查看,接下来,我们根据内核源码,来分析聚集函数的执行流程,执行阶段入口函数ExecWindowAgg

相关数据结构

typedef struct WindowAggState

	ScanState	ss;				/* its first field is NodeTag */

	/* these fields are filled in by ExecInitExpr: */
	List	   *funcs;			/* targetlist中的所有WindowFunc节点 */
	int			numfuncs;		/* 窗口函数总数 */
	int			numaggs;		/* 普通聚合数 */

	WindowStatePerFunc perfunc; /* 该结构存储窗口函数的OID及执行结果等信息 */
	WindowStatePerAgg peragg;	/* 该结构存储聚合函数内部处理的函数oid(转换函数、最终函数)及执行结果等信息 */
	ExprState  *partEqfunction; /* 分区列的相等函数 */
	ExprState  *ordEqfunction;	/* 用于排序列的相等函数 */
	Tuplestorestate *buffer;	/* 存储当前分区的行 */
	int			current_ptr;	/* read pointer # for current row */
	int			framehead_ptr;	/* read pointer # for frame head, if used */
	int			frametail_ptr;	/* read pointer # for frame tail, if used */
	int			grouptail_ptr;	/* read pointer # for group tail, if used */
	int64		spooled_rows;	/* 缓冲区中的行总数 */
	int64		currentpos;		/* 分区中当前行的位置 */
	int64		frameheadpos;	/* 当前帧头位置 */
	int64		frametailpos;	/* 当前帧尾位置(frame end+1) */
	/* use struct pointer to avoid including windowapi.h here */
	struct WindowObjectData *agg_winobj;	/* 用于聚合获取的winobj */
	int64		aggregatedbase; /* 当前聚合的起始行 */
	int64		aggregatedupto; /* 在此之前被聚合的行 */

	int			frameOptions;	/* frame_clause options, see WindowDef */
	ExprState  *startOffset;	/* 起始边界偏移量的表达式 */
	ExprState  *endOffset;		/* 结束边界偏移量的表达式 */
	Datum		startOffsetValue;	/* startOffset 评估的结果 */
	Datum		endOffsetValue; /* endOffset 评估的结果 */

	/* 这些字段与 RANGE offset PRECEDING/FOLLOWING 一起使用: */
	FmgrInfo	startInRangeFunc;	/* in_range function for startOffset */
	FmgrInfo	endInRangeFunc; /* in_range function for endOffset */
	Oid			inRangeColl;	/* collation for in_range tests */
	bool		inRangeAsc;		/* use ASC sort order for in_range tests? */
	bool		inRangeNullsFirst;	/* nulls sort first for in_range tests? */

	/* these fields are used in GROUPS mode: */
	int64		currentgroup;	/* peer group # of current row in partition */
	int64		frameheadgroup; /* peer group # of frame head row */
	int64		frametailgroup; /* peer group # of frame tail row */
	int64		groupheadpos;	/* current row's peer group head position */
	int64		grouptailpos;	/* " " " " tail position (group end+1) */

	MemoryContext partcontext;	/* 分区数据有效期的上下文 */
	MemoryContext aggcontext;	/* 聚合工作数据的共享上下文 */
	MemoryContext curaggcontext;	/* 当前聚合工作数据的上下文 */
	ExprContext *tmpcontext;	/* 用于短期计算的上下文 */

	bool		all_first;		/* 如果扫描正在启动,则为true */
	bool		all_done;		/* 如果扫描完成,则为true */
	bool		partition_spooled;	/* 如果当前分区中的所有元组都已存储到元组存储中,则为true */
	bool		more_partitions;	/* 如果在此分区之后有更多分区,则为true */
	bool		framehead_valid;	/* 如果当前行的 frameheadpos 是最新的,则为真 */
	bool		frametail_valid;	/* 如果当前行的 frametailpos 是最新的,则为真 */
	bool		grouptail_valid;	/* 如果当前行的 grouptailpos 是最新的,则为真*/

	TupleTableSlot *first_part_slot; /* 当前分区或下一分区的第一个元组*/
	TupleTableSlot *framehead_slot; /* 当前帧的第一个元组 */
	TupleTableSlot *frametail_slot; /* 当前帧后的第一个元组 */

	/* 从tuplestore取回的元组的临时槽 */
	TupleTableSlot *agg_row_slot;
	TupleTableSlot *temp_slot_1;
	TupleTableSlot *temp_slot_2;
 WindowAggState;

/*
 * 对于普通聚合窗口函数,我们也有其中之一。
 */
typedef struct WindowStatePerAggData

	/* 转换函数的Oid */
	Oid			transfn_oid;
	Oid			invtransfn_oid; /* may be InvalidOid */
	Oid			finalfn_oid;	/* may be InvalidOid */

	/*
	 * 转换函数的 fmgr 查找数据 --- 仅当相应的 oid 不是 InvalidOid 时才有效。
	 * 特别注意 fn_strict 标志保留在这里。
	 */
	FmgrInfo	transfn;
	FmgrInfo	invtransfn;
	FmgrInfo	finalfn;

	int			numFinalArgs;	/* 传递给 finalfn 的参数数量 */

以上是关于Postgres数据库之聚集函数内核源码学习总结的主要内容,如果未能解决你的问题,请参考以下文章

Postgres数据库词法分析和语法分析源码解析

Postgres数据库词法分析和语法分析源码解析

Postgres数据库词法分析和语法分析源码解析

Postgres数据库词法分析和语法分析源码解析

Postgres数据库之增加dm默认表空间main学习汇总

Postgres数据库之增加dm默认表空间main学习汇总